python高阶教程-并行编程

本篇内容来自原创小册子《python高阶教程》,点击查看目录

为什么要写并行代码

python的优势在于可以快速构建算法原型,但是执行效率不高。比如说实现一个图像的分类识别算法,我们需要对图像进行预处理。在海量数据面前,单线程明显会成为性能的瓶颈。

用函数实现多线程

如果只是简单的多线程任务,可以写成函数的形式。这主要用到了threading模块中的Thread类。

1
2
3
4
5
6
7
8
9
10
11
12
13
import threading
import time
import random
def function(i):
count = 0
while count < 3:
time.sleep(random.randint(0,3))
count += 1
print("thread %d, time %s" %(i, time.ctime(time.time())))
return
for i in range(2):
t = threading.Thread(target=function, args=(i,))
t.start()

这段代码主要由一个任务函数functionfor循环体构成。

在循环体中,我们以threading模块中的Thread类为模板,以function和循环体变量i为参数初始化一个实例,然后调用这个实例的start()方法。在function函数中,打印三次当前的时间,但是休眠的时间间隔是随机的。这主要是为了模拟不同的计算量,表明不同线程是并行执行的。

这段代码的执行结果如下:

1
2
3
4
5
6
thread 0, time Mon Jun 18 18:37:13 2018
thread 1, time Mon Jun 18 18:37:14 2018
thread 0, time Mon Jun 18 18:37:14 2018
thread 0, time Mon Jun 18 18:37:16 2018
thread 1, time Mon Jun 18 18:37:17 2018
thread 1, time Mon Jun 18 18:37:19 2018

观察执行结果,可以发现thread 0 和thread 1被随机调度。当然,我们这里两个线程属于同一个进程,微观上,在同一时刻还是只有一个线程被处于运行状态;宏观上,两个线程同时执行。

用类实现多线程

使用函数实现多线程,本质上是产生了多个实例。我们也可以定义一个类,用来继承threading.Thread. 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import threading
import time
import random
class myThread (threading.Thread): #继承父类threading.Thread
def __init__(self, threadID):
threading.Thread.__init__(self)
self.threadID = threadID
def run(self): #把要执行的代码写到run函数里面 线程在创建后会直接运行run函数
count = 0
while count < 3:
time.sleep(random.randint(0,3))
count += 1
print("thread %d, time %s" %(self.threadID, time.ctime(time.time())))
return
# 创建新线程
thread1 = myThread(0)
thread2 = myThread(1)
# 开启线程
thread1.start()
thread2.start()

这段代码与使用函数实现多线程功能相同,执行结果如下:

1
2
3
4
5
6
thread 0, time Mon Jun 18 19:11:59 2018
thread 1, time Mon Jun 18 19:11:59 2018
thread 0, time Mon Jun 18 19:12:00 2018
thread 1, time Mon Jun 18 19:12:00 2018
thread 1, time Mon Jun 18 19:12:01 2018
thread 0, time Mon Jun 18 19:12:03 2018

使用lock实现线程互斥

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import threading
import time
import random
count = 0
lock = threading.Lock()
def use_lock(func):
def wrapper(*arg, **kw):
global lock
lock.acquire()
res = func(*arg, **kw)
lock.release()
return res
return wrapper
def inc(func):
def wrapper(*arg, **kw):
global count
count += 1
return func(*arg, **kw)
return wrapper
def dec(func):
def wrapper(*arg, **kw):
global count
count -= 1
return func(*arg, **kw)
return wrapper
@use_lock
@inc
def increment_lock():
pass
@use_lock
@dec
def decrement_lock():
pass
@inc
def increment():
pass
@dec
def decrement():
pass
def thread1():
count = 0
while count < 999999:
count += 1
increment_lock()
def thread2():
count = 0
while count < 999999:
count += 1
decrement_lock()
def thread3():
count = 0
while count < 999999:
count += 1
increment()
def thread4():
count = 0
while count < 999999:
count += 1
decrement()
thread_ins_1 = threading.Thread(target=thread1, args=())
thread_ins_2 = threading.Thread(target=thread2, args=())
thread_ins_3 = threading.Thread(target=thread3, args=())
thread_ins_4 = threading.Thread(target=thread4, args=())
count = 0
thread_ins_1.start()
thread_ins_2.start()
thread_ins_1.join()
thread_ins_2.join()
print("wiht lock, count is ", count)
count = 0
thread_ins_3.start()
thread_ins_4.start()
thread_ins_3.join()
thread_ins_4.join()
print("without lock, count is ", count)

这段代码的执行结果如下所示:

1
2
wiht lock, count is 0
without lock, count is 32376

可以看出,没有使用锁的共享资源出现了错乱。

使用信号量实现线程同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import threading
import time
import random
# initial value is 0
# which means we have to release it before using it
semaphore = threading.Semaphore(0)
item = 0
def consumer():
global item
time.sleep(random.randint(0,3))
# If semaphore is not released, then wait
semaphore.acquire()
print("Consumer notify: consumed item number %s" %item)
def producer():
global item
global semaphore
time.sleep(random.randint(0,3))
item = random.randint(0,1000)
print("Producer notify: produced item number %s" %item)
# release semaphore, which is add it by 1
semaphore.release()
# run for 3 times
for i in range(3):
t1 = threading.Thread(target= producer)
t2 = threading.Thread(target= consumer)
t1.start()
t2.start()
t1.join()
t2.join()

这里我们把信号量的值初始化为0,意味着必须先释放才能获取。那么释放信号量的线程就可以先执行,如此完成两个线程之间的同步。

代码的执行结果如下:

1
2
3
4
5
6
Producer notify: produced item number 977
Consumer notify: consumed item number 977
Producer notify: produced item number 812
Consumer notify: consumed item number 812
Producer notify: produced item number 500
Consumer notify: consumed item number 500

GIL限制

GIL全称为Global Interpreter Lock,是CPython解释器中用来防止多线程并发执行机器码的一个互斥锁。GIL会造成python的CPU密集型程序的多线程效率低下。

采用多线程来测试GIL的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from threading import Thread
class threads_object(Thread):
def run(self):
function_to_run()
class nothreads_object(object):
def run(self):
function_to_run()
def non_threaded(num_iter):
funcs = []
for i in range(int(num_iter)):
funcs.append(nothreads_object())
for i in funcs:
i.run()
def threaded(num_threads):
funcs = []
for i in range(int(num_threads)):
funcs.append(threads_object())
for i in funcs:
i.start()
for i in funcs:
i.join()
def function_to_run():
count = 0
while count < 1e4:
count += 1
def show_results(func_name, results):
print("%-23s %4.6f seconds" %(func_name, results))
if __name__ == '__main__':
import sys
from timeit import Timer
repeat = 1000
number = 1
num_threads = [1,2,4,8]
print("Starting tests")
for i in num_threads:
t = Timer("non_threaded(%s)" \
%i, "from __main__ import non_threaded")
best_result = min(t.repeat(repeat= repeat, number= number))
show_results("non_threaded (%s iters)" \
% i, best_result)
t = Timer("threaded(%s)" \
%i, "from __main__ import threaded")
best_result = min(t.repeat(repeat= repeat, number= number))
show_results( "threaded (%s threads)" \
% i, best_result)

结果如下:

1
2
3
4
5
6
7
8
9
10
Starting tests
non_threaded (1 iters) 0.000940 seconds
threaded (1 threads) 0.001085 seconds
non_threaded (2 iters) 0.001863 seconds
threaded (2 threads) 0.002322 seconds
non_threaded (4 iters) 0.003775 seconds
threaded (4 threads) 0.004687 seconds
non_threaded (8 iters) 0.007560 seconds
threaded (8 threads) 0.009515 seconds
从执行结果上看,采用多线程方案会比顺序执行慢一些。

多进程

多进程的编程模式与多线程颇为相似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import multiprocessing
import time
import random
def function(i):
count = 0
while count < 3:
time.sleep(random.randint(0,3))
count += 1
print("process %d, time %s" %(i, time.ctime(time.time())))
return
if __name__ == '__main__':
for i in range(2):
t = multiprocessing.Process(target=function, args=(i,))
t.start()

执行结果如下:

1
2
3
4
5
6
process 1, time Tue Jun 19 14:37:49 2018
process 1, time Tue Jun 19 14:37:49 2018
process 0, time Tue Jun 19 14:37:51 2018
process 1, time Tue Jun 19 14:37:52 2018
process 0, time Tue Jun 19 14:37:53 2018
process 0, time Tue Jun 19 14:37:53 2018

0%