Python Concurrency

这里把Python concurrency的内容单独拿出来整理一下, 主要是multithreading, multiprocessing and asyncio.

根据需要选择适合场景的concurrency pattern, typs of concurrency (here we talk running app on a single machine): Parallel programming Working with multi-process cores, suited for CPUs intensive tasks (CPU-bound tasks): solving the problem rather than reading to or writing from a device. Better CPU gets better performance:

  • String operations
  • Search algorithns
  • Graphics processing

Asynchronous programming Suited for IO intensive tasks (IO-bound tasks): most of time reading to or writing from a device, Either to disk or to a network. 经常和callback function一起实现,或者使用future, promise or task, 主线程可以检查完成情况, use cases:

  • Databse reads, writes
  • Web service calls
  • Copying, downloading, uploading data

Python Concurrency

Python has concurrency support as the diagram shows:

1
2
3
4
5
6
7
8
9
10
11
12
13
+---------------------------------------------------+
| |
| concurrent.futures (3.2+) |
| |
| +-------------------+ +------------------------+ |
| | threading (1.5+) | | multiprocessing (2.6+) | |
| +-------------------+ +------------------------+ |
+---------------------------------------------------+

+------------+
| asyncio |
| (3.4+) |
+------------+

这里要提一下subprocessmultiprocessing modules的区别:

Git Repo

Demo code without threads and mulitprocessing: https://github.com/tim-ojo/python-concurrency-getting-started

In newer version, Logging is disabled by pytest, need to explicitly enable it:

1
pytest -p no:logging

Threading

这里的介绍已经说得很清楚了: https://docs.python.org/3/library/threading.html

如果有多个cores, threads running in parallel, if only single core, threads share time on that core.

A process starts with a main thread (注意main thread并不是这个process, process就像一个container, 提供资源和环境,thread才是真正用来执行任务), the main thread spawns other worker threads. 不过仅仅使用thread的基本并发功能有很多缺陷,比如thread interference, race condition.

这里说一下Python threading 的局限, GIL(Global Interpreter Lock), only one Python thread can run at a time, it is not true concurrency, it is a cooperative multithreading, so using Python threads in IO-bound tasks rather than CPU-bound tasks.

GIL workarounds:

  • Jython (write python wrapped by Java)
  • IronPython
  • Python Multiprocessing
  • concurrent.futures.ProcessPoolExecutor

如何构造threads呢? You can pass callable object (function) to constructor, the Thread class is defined as

1
2
class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):
pass

For example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import threading

def do_some_work(val):
print ("doing some work in thread")
print ("echo: {}".format(val))
return

val = "text"
## pass callable to constructor
## args is tuple
t=threading.Thread(target=do_some_work,args=(val,))
## start thread t
t.start()
## main thread waits until called thread terminates
t.join()

Or by overriding the run() method in a subclass. No other methods (except for the constructor) should be overridden in a subclass. In other words, only override the __init__() and run() methods of this class.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading

class FibonacciThread(threading.Thread):
def __init__(self, num):
Thread.__init__(self)
self.num = num

def run(self):
fib=[0]*(self.num + 1)
fib[0] = 0
fib[1] = 1
for i in range(2, self.num + 1):
fib[i] = fib[i - 1] + fib[i - 2]
print fib[self.num]

myFibTask1 = FibonacciThread(9)
myFibTask2 = FibonacciThread(12)
myFibTask1.start()
myFibTask2.start()

myFibTask1.join()
myFibTask2.join()

Thread interference, a typical example is bank account deposit and withdraw, a race condition may occur. To synchronze threads, can use lock (primitive or reentrant)

  • primitive lock, any thread can release it.
  • reentrant lock, only holder can release, can be acquired multiple times, by the same thread.

Lock benefit: faster then other thread sync mechanisms.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading

## not owned by a particular thread
## create in main thread
lock = threading.Lock()

## call in work threads
## automically acquire and release lock
with lock:
pass ## do something

## use try-finally block
lock.acquire()
try:
pass ## do something
finally:
lock.release()

## check is the lock is acquired
lock.locked()

Semaphore: maintains a set of permits. Semaphores are often used to guard resources with limited capacity, for example, a database server.

1
2
3
4
5
6
7
8
9
10
## create in main thread, default permit is 1
## BoundedSemaphore can prevent release operation number exceeds acquire's
maxconnections = 5
pool_sema = threading.BoundedSemaphore(maxconnections)

## call in worker threads
## automically acquire and release semaphore
with pool_sema:
## connect to the database server
pass

Events: This is one of the simplest mechanisms for communication between threads: one thread signals an event and other threads wait for it.

1
2
3
4
5
6
7
8
9
10
11
12
13
import threading
## initially false
event = threading.Event()

## work thread
## block until event is set to true
event.wait()

## main thread
## set to true
event.set()
## set to false
event.clear()

Conditions: combine lock and event, used for producer-consumer pattern.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threading

cond = threading.Condition()
## consumer
cond.acquire()
while not an_item_is_available():
## wait will release lock
## Once awakened, it re-acquires the lock and returns.
cond.wait()
get_an_available_item()
cond.release()

## producer
cond.acquire()
make_an_item_available()
## Since notify() does not release the lock, its caller should.
cond.notify()
cond.release()

感觉这个还比较有用: Inter-thread communication using queues. Python的queue module实际上是一个synchronized queue class, 用来threaded programming. 4 common methids: put(), get(), task_done(), join(). The put and get calls are blocking call.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from queue import Queue
from threading import Thread

def img_down(producer):
while not producer:
try:
url = producer.get(block=False)
## do sth
producer.task_done()
except producer.Empty:
## write some logs

## image download queue
producer = Queue()
urls = ["https://image1", "https://image2", "https://image3"]
for url in urls:
producer.put(url)

## specify only 2 threads to download
num_thread = 2
for i in range(num_thread):
t = Thread(target=img_down, args=(producer,))
t.start()

producer.join()

Multiprocessing

Process benefits:

  • sidesteps GIL, one GIL for every python process.
  • less need for synchronization.
  • can be paused and terminated.
  • more resilient, one crash will not bring down other prcesses.

multiprocessing is a package that supports spawning processes using an API similar to the threading module.

1
import multiporcessing

picklable arguments: serializing and deserializing.

Pool, apply_async, apply inter-process communication: pipe and queue

AsycnIO

0%