网站首页 文章专栏 python高阶教程-并行编程
python高阶教程-并行编程
创建于:2021-07-04 08:19:34 更新于:2024-11-23 08:17:32 羽瀚尘 472

python高阶教程-并行编程

为什么要写并行代码

python的优势在于可以快速构建算法原型,但是执行效率不高。比如说实现一个图像的分类识别算法,我们需要对图像进行预处理。在海量数据面前,单线程明显会成为性能的瓶颈。

用函数实现多线程

如果只是简单的多线程任务,可以写成函数的形式。这主要用到了threading模块中的Thread类。

import threading 
import time 
import random 
def function(i):
      count = 0
      while count < 3:
            time.sleep(random.randint(0,3))
            count += 1
            print("thread %d, time %s" %(i, time.ctime(time.time())))
      return 
for i in range(2):
      t = threading.Thread(target=function, args=(i,))
      t.start()

这段代码主要由一个任务函数functionfor循环体构成。

在循环体中,我们以threading模块中的Thread类为模板,以function和循环体变量i为参数初始化一个实例,然后调用这个实例的start()方法。在function函数中,打印三次当前的时间,但是休眠的时间间隔是随机的。这主要是为了模拟不同的计算量,表明不同线程是并行执行的。

这段代码的执行结果如下:

thread 0, time Mon Jun 18 18:37:13 2018
thread 1, time Mon Jun 18 18:37:14 2018
thread 0, time Mon Jun 18 18:37:14 2018
thread 0, time Mon Jun 18 18:37:16 2018
thread 1, time Mon Jun 18 18:37:17 2018
thread 1, time Mon Jun 18 18:37:19 2018

观察执行结果,可以发现thread 0 和thread 1被随机调度。当然,我们这里两个线程属于同一个进程,微观上,在同一时刻还是只有一个线程被处于运行状态;宏观上,两个线程同时执行。

用类实现多线程

使用函数实现多线程,本质上是产生了多个实例。我们也可以定义一个类,用来继承threading.Thread. 代码如下:

import threading
import time
import random 
 
class myThread (threading.Thread):   #继承父类threading.Thread
    def __init__(self, threadID):
        threading.Thread.__init__(self)
        self.threadID = threadID
    def run(self):                   #把要执行的代码写到run函数里面 线程在创建后会直接运行run函数 
        count = 0
        while count < 3:
                time.sleep(random.randint(0,3))
                count += 1
                print("thread %d, time %s" %(self.threadID, time.ctime(time.time())))
        return 
 
## 创建新线程
thread1 = myThread(0)
thread2 = myThread(1)
 
## 开启线程
thread1.start()
thread2.start()

这段代码与使用函数实现多线程功能相同,执行结果如下:

thread 0, time Mon Jun 18 19:11:59 2018
thread 1, time Mon Jun 18 19:11:59 2018
thread 0, time Mon Jun 18 19:12:00 2018
thread 1, time Mon Jun 18 19:12:00 2018
thread 1, time Mon Jun 18 19:12:01 2018
thread 0, time Mon Jun 18 19:12:03 2018

使用lock实现线程互斥

import threading
import time
import random 

count = 0
lock = threading.Lock()

def use_lock(func):
    def wrapper(*arg, **kw):
        global lock 
        lock.acquire()
        res = func(*arg, **kw)
        lock.release()
        return res 
    return wrapper 
def inc(func):
    def wrapper(*arg, **kw):
        global count
        count += 1
        return func(*arg, **kw)
    return wrapper 
def dec(func):
    def wrapper(*arg, **kw):
        global count
        count -= 1
        return func(*arg, **kw)
    return wrapper 

@use_lock
@inc
def increment_lock():
    pass 

@use_lock
@dec 
def decrement_lock():
    pass 
@inc 
def increment():
    pass 
@dec 
def decrement():
    pass 

def thread1():
    count = 0
    while count < 999999:
        count += 1
        increment_lock()
def thread2():
    count = 0
    while count < 999999:
        count += 1
        decrement_lock()
def thread3():
    count = 0
    while count < 999999:
        count += 1
        increment()
def thread4():
    count = 0
    while count < 999999:
        count += 1
        decrement()


thread_ins_1 = threading.Thread(target=thread1, args=())
thread_ins_2 = threading.Thread(target=thread2, args=())
thread_ins_3 = threading.Thread(target=thread3, args=())
thread_ins_4 = threading.Thread(target=thread4, args=())

count = 0
thread_ins_1.start()
thread_ins_2.start()
thread_ins_1.join()
thread_ins_2.join()
print("wiht lock, count is ", count)

count = 0
thread_ins_3.start()
thread_ins_4.start()
thread_ins_3.join()
thread_ins_4.join()
print("without lock, count is ", count)

这段代码的执行结果如下所示:

wiht lock, count is  0
without lock, count is  32376

可以看出,没有使用锁的共享资源出现了错乱。

使用信号量实现线程同步

import threading 
import time 
import random 

## initial value is 0
## which means we have to release it before using it
semaphore = threading.Semaphore(0)
item = 0

def consumer():
    global item 
    time.sleep(random.randint(0,3))
    ## If semaphore is not released, then wait
    semaphore.acquire()
    print("Consumer notify: consumed item number %s" %item)

def producer():
    global item 
    global semaphore 
    time.sleep(random.randint(0,3))
    item = random.randint(0,1000)
    print("Producer notify: produced item number %s" %item)
    ## release semaphore, which is add it by 1
    semaphore.release()
## run for 3 times
for i in range(3):
    t1 = threading.Thread(target= producer)
    t2 = threading.Thread(target= consumer)
    t1.start()
    t2.start()

    t1.join()
    t2.join()

这里我们把信号量的值初始化为0,意味着必须先释放才能获取。那么释放信号量的线程就可以先执行,如此完成两个线程之间的同步。

代码的执行结果如下:

Producer notify: produced item number 977
Consumer notify: consumed item number 977
Producer notify: produced item number 812
Consumer notify: consumed item number 812
Producer notify: produced item number 500
Consumer notify: consumed item number 500

GIL限制

GIL全称为Global Interpreter Lock,是CPython解释器中用来防止多线程并发执行机器码的一个互斥锁。GIL会造成python的CPU密集型程序的多线程效率低下。

采用多线程来测试GIL的代码如下:

from threading import Thread 
class threads_object(Thread):
    def run(self):
        function_to_run() 
class nothreads_object(object):
    def run(self):
        function_to_run()
def non_threaded(num_iter):
    funcs = []
    for i in range(int(num_iter)):
        funcs.append(nothreads_object())
    for i in funcs:
        i.run() 
def threaded(num_threads):
    funcs = [] 
    for i in range(int(num_threads)):
        funcs.append(threads_object())
    for i in funcs:
        i.start() 
    for i in funcs:
        i.join()
def function_to_run():
    count = 0 
    while count < 1e4:
        count += 1
def show_results(func_name, results):
    print("%-23s %4.6f seconds" %(func_name, results))
if __name__ == '__main__':
    import sys 
    from timeit import Timer 

    repeat = 1000
    number = 1
    num_threads = [1,2,4,8] 
    print("Starting tests") 
    for i in num_threads:
        t = Timer("non_threaded(%s)" \
                   %i, "from __main__ import non_threaded") 
        best_result = min(t.repeat(repeat= repeat, number= number)) 
        show_results("non_threaded (%s iters)" \
                   % i, best_result) 
        t = Timer("threaded(%s)" \
                    %i, "from __main__ import threaded") 
        best_result = min(t.repeat(repeat= repeat, number= number)) 
        show_results( "threaded (%s threads)" \
                    % i, best_result)

结果如下:

Starting tests
non_threaded (1 iters)  0.000940 seconds
threaded (1 threads)    0.001085 seconds
non_threaded (2 iters)  0.001863 seconds
threaded (2 threads)    0.002322 seconds
non_threaded (4 iters)  0.003775 seconds
threaded (4 threads)    0.004687 seconds
non_threaded (8 iters)  0.007560 seconds
threaded (8 threads)    0.009515 seconds
从执行结果上看,采用多线程方案会比顺序执行慢一些。

多进程

多进程的编程模式与多线程颇为相似。

import multiprocessing
import time 
import random 
def function(i):
      count = 0
      while count < 3:
            time.sleep(random.randint(0,3))
            count += 1
            print("process %d, time %s" %(i, time.ctime(time.time())))
      return 
if __name__ == '__main__':
    for i in range(2):
        t = multiprocessing.Process(target=function, args=(i,))
        t.start()

执行结果如下:

process 1, time Tue Jun 19 14:37:49 2018
process 1, time Tue Jun 19 14:37:49 2018
process 0, time Tue Jun 19 14:37:51 2018
process 1, time Tue Jun 19 14:37:52 2018
process 0, time Tue Jun 19 14:37:53 2018
process 0, time Tue Jun 19 14:37:53 2018