网站首页 文章专栏 python高阶教程-并行编程
本篇内容来自原创小册子《python高阶教程》,点击查看目录。
python的优势在于可以快速构建算法原型,但是执行效率不高。比如说实现一个图像的分类识别算法,我们需要对图像进行预处理。在海量数据面前,单线程明显会成为性能的瓶颈。
如果只是简单的多线程任务,可以写成函数的形式。这主要用到了threading
模块中的Thread
类。
import threading import time import random def function(i): count = 0 while count < 3: time.sleep(random.randint(0,3)) count += 1 print("thread %d, time %s" %(i, time.ctime(time.time()))) return for i in range(2): t = threading.Thread(target=function, args=(i,)) t.start()
这段代码主要由一个任务函数function
和for
循环体构成。
在循环体中,我们以threading模块中的Thread类为模板,以function和循环体变量i为参数初始化一个实例,然后调用这个实例的start()方法。在function函数中,打印三次当前的时间,但是休眠的时间间隔是随机的。这主要是为了模拟不同的计算量,表明不同线程是并行执行的。
这段代码的执行结果如下:
thread 0, time Mon Jun 18 18:37:13 2018 thread 1, time Mon Jun 18 18:37:14 2018 thread 0, time Mon Jun 18 18:37:14 2018 thread 0, time Mon Jun 18 18:37:16 2018 thread 1, time Mon Jun 18 18:37:17 2018 thread 1, time Mon Jun 18 18:37:19 2018
观察执行结果,可以发现thread 0 和thread 1被随机调度。当然,我们这里两个线程属于同一个进程,微观上,在同一时刻还是只有一个线程被处于运行状态;宏观上,两个线程同时执行。
使用函数实现多线程,本质上是产生了多个实例。我们也可以定义一个类,用来继承threading.Thread. 代码如下:
import threading import time import random class myThread (threading.Thread): #继承父类threading.Thread def __init__(self, threadID): threading.Thread.__init__(self) self.threadID = threadID def run(self): #把要执行的代码写到run函数里面 线程在创建后会直接运行run函数 count = 0 while count < 3: time.sleep(random.randint(0,3)) count += 1 print("thread %d, time %s" %(self.threadID, time.ctime(time.time()))) return # 创建新线程 thread1 = myThread(0) thread2 = myThread(1) # 开启线程 thread1.start() thread2.start()
这段代码与使用函数实现多线程功能相同,执行结果如下:
thread 0, time Mon Jun 18 19:11:59 2018 thread 1, time Mon Jun 18 19:11:59 2018 thread 0, time Mon Jun 18 19:12:00 2018 thread 1, time Mon Jun 18 19:12:00 2018 thread 1, time Mon Jun 18 19:12:01 2018 thread 0, time Mon Jun 18 19:12:03 2018
import threading import time import random count = 0 lock = threading.Lock() def use_lock(func): def wrapper(*arg, **kw): global lock lock.acquire() res = func(*arg, **kw) lock.release() return res return wrapper def inc(func): def wrapper(*arg, **kw): global count count += 1 return func(*arg, **kw) return wrapper def dec(func): def wrapper(*arg, **kw): global count count -= 1 return func(*arg, **kw) return wrapper @use_lock @inc def increment_lock(): pass @use_lock @dec def decrement_lock(): pass @inc def increment(): pass @dec def decrement(): pass def thread1(): count = 0 while count < 999999: count += 1 increment_lock() def thread2(): count = 0 while count < 999999: count += 1 decrement_lock() def thread3(): count = 0 while count < 999999: count += 1 increment() def thread4(): count = 0 while count < 999999: count += 1 decrement() thread_ins_1 = threading.Thread(target=thread1, args=()) thread_ins_2 = threading.Thread(target=thread2, args=()) thread_ins_3 = threading.Thread(target=thread3, args=()) thread_ins_4 = threading.Thread(target=thread4, args=()) count = 0 thread_ins_1.start() thread_ins_2.start() thread_ins_1.join() thread_ins_2.join() print("wiht lock, count is ", count) count = 0 thread_ins_3.start() thread_ins_4.start() thread_ins_3.join() thread_ins_4.join() print("without lock, count is ", count)
这段代码的执行结果如下所示:
wiht lock, count is 0 without lock, count is 32376
可以看出,没有使用锁的共享资源出现了错乱。
import threading import time import random # initial value is 0 # which means we have to release it before using it semaphore = threading.Semaphore(0) item = 0 def consumer(): global item time.sleep(random.randint(0,3)) # If semaphore is not released, then wait semaphore.acquire() print("Consumer notify: consumed item number %s" %item) def producer(): global item global semaphore time.sleep(random.randint(0,3)) item = random.randint(0,1000) print("Producer notify: produced item number %s" %item) # release semaphore, which is add it by 1 semaphore.release() # run for 3 times for i in range(3): t1 = threading.Thread(target= producer) t2 = threading.Thread(target= consumer) t1.start() t2.start() t1.join() t2.join()
这里我们把信号量的值初始化为0,意味着必须先释放才能获取。那么释放信号量的线程就可以先执行,如此完成两个线程之间的同步。
代码的执行结果如下:
Producer notify: produced item number 977 Consumer notify: consumed item number 977 Producer notify: produced item number 812 Consumer notify: consumed item number 812 Producer notify: produced item number 500 Consumer notify: consumed item number 500
GIL全称为Global Interpreter Lock,是CPython解释器中用来防止多线程并发执行机器码的一个互斥锁。GIL会造成python的CPU密集型程序的多线程效率低下。
采用多线程来测试GIL的代码如下:
from threading import Thread class threads_object(Thread): def run(self): function_to_run() class nothreads_object(object): def run(self): function_to_run() def non_threaded(num_iter): funcs = [] for i in range(int(num_iter)): funcs.append(nothreads_object()) for i in funcs: i.run() def threaded(num_threads): funcs = [] for i in range(int(num_threads)): funcs.append(threads_object()) for i in funcs: i.start() for i in funcs: i.join() def function_to_run(): count = 0 while count < 1e4: count += 1 def show_results(func_name, results): print("%-23s %4.6f seconds" %(func_name, results)) if __name__ == '__main__': import sys from timeit import Timer repeat = 1000 number = 1 num_threads = [1,2,4,8] print("Starting tests") for i in num_threads: t = Timer("non_threaded(%s)" \ %i, "from __main__ import non_threaded") best_result = min(t.repeat(repeat= repeat, number= number)) show_results("non_threaded (%s iters)" \ % i, best_result) t = Timer("threaded(%s)" \ %i, "from __main__ import threaded") best_result = min(t.repeat(repeat= repeat, number= number)) show_results( "threaded (%s threads)" \ % i, best_result)
结果如下:
Starting tests non_threaded (1 iters) 0.000940 seconds threaded (1 threads) 0.001085 seconds non_threaded (2 iters) 0.001863 seconds threaded (2 threads) 0.002322 seconds non_threaded (4 iters) 0.003775 seconds threaded (4 threads) 0.004687 seconds non_threaded (8 iters) 0.007560 seconds threaded (8 threads) 0.009515 seconds 从执行结果上看,采用多线程方案会比顺序执行慢一些。
多进程的编程模式与多线程颇为相似。
import multiprocessing import time import random def function(i): count = 0 while count < 3: time.sleep(random.randint(0,3)) count += 1 print("process %d, time %s" %(i, time.ctime(time.time()))) return if __name__ == '__main__': for i in range(2): t = multiprocessing.Process(target=function, args=(i,)) t.start()
执行结果如下:
process 1, time Tue Jun 19 14:37:49 2018 process 1, time Tue Jun 19 14:37:49 2018 process 0, time Tue Jun 19 14:37:51 2018 process 1, time Tue Jun 19 14:37:52 2018 process 0, time Tue Jun 19 14:37:53 2018 process 0, time Tue Jun 19 14:37:53 2018