博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python--multiprocessing多进程总结
阅读量:6820 次
发布时间:2019-06-26

本文共 8183 字,大约阅读时间需要 27 分钟。

  由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

  multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

但在使用这些共享API的时候,我们要注意以下几点:

  • 在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。
  • multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是用户进程的资源)。
  • 多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过共享内存和Manager的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。

Process.PID中保存有PID,如果进程还没有start(),则PID为None。

window系统下,需要注意的是要想启动一个子进程,必须加上那句if __name__ == "main",进程相关的要写在这句下面。

 

简单创建多进程:

有两种使用方法,直接传入要运行的方法或从Process继承并覆盖run():

from multiprocessing import Processimport threadingimport timedef foo(i):    print 'say hi', iif __name__ == '__main__':    for i in range(10):        p = Process(target=foo, args=(i,))        p.start()
方法一
say hi 4say hi 3say hi 5say hi 2say hi 1say hi 6say hi 0say hi 7say hi 8say hi 9Process finished with exit code 0可以看出多个进程随机顺序执行
运行结果

 

from multiprocessing import Processimport timeclass MyProcess(Process):    def __init__(self, arg):        super(MyProcess, self).__init__()        self.arg = arg    def run(self):        print 'say hi', self.arg        time.sleep(1)    if __name__ == '__main__':    for i in range(10):        p = MyProcess(i)        p.start()
方法二

 

Process类


 

构造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 线程组,目前还没有实现,库引用中提示必须是None; 

  target: 要执行的方法; 
  name: 进程名; 
  args/kwargs: 要传入方法的参数。

实例方法:

  is_alive():返回进程是否在运行。

  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

  start():进程准备就绪,等待CPU调度

  run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

  terminate():不管任务是否完成,立即停止工作进程

属性:

  authkey

  daemon:和线程的setDeamon功能一样

  exitcode(进程在运行时为None、如果为–N,表示被信号N结束)

  name:进程名字。

  pid:进程号。

 

例子一:

from multiprocessing import Processimport threadingimport time  def foo(i):    print 'say hi',i  for i in range(10):    p = Process(target=foo,args=(i,))    p.start()
创建进程
say hi 0say hi 3say hi 6say hi 1say hi 8say hi 2say hi 5say hi 4say hi 7say hi 9Process finished with exit code 0
运行结果

例子二:

def foo(i):    time.sleep(1)    print 'say hi', i    time.sleep(1)if __name__ == '__main__':    p_list=[]    for i in range(10):        p = Process(target=foo, args=(i,))        p.daemon=True        p_list.append(p)    for p in p_list:        p.start()    for p in p_list:        p.join()    print 'main process end'
View Code
say hi 1say hi 2say hi 5say hi 6say hi 7say hi 0say hi 4say hi 3say hi 8say hi 9main process endProcess finished with exit code 0
运行结果

可以看出join()方法和deamon属性的用法和多线程的基本一致。

 

Pool类


  

  进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。进程池设置最好等于CPU核心数量

构造方法:

Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

processes :使用的工作进程的数量,如果processes是None那么使用 os.cpu_count()返回的数量。

initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context

 

实例方法:

  apply(func[, args[, kwds]]):同步进程池

  apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :异步进程池

  close() : 关闭进程池,阻止更多的任务提交到pool,待任务完成后,工作进程会退出。

  terminate() : 结束工作进程,不在处理未完成的任务

  join() : wait工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。pool.join()必须使用在

 

例子一(异步进程池):

pool.close()或者pool.terminate()之后。其中close()跟terminate()的区别在于close()会等待池中的worker进程执行结束再关闭pool,而terminate()则是直接关闭。

# coding:utf-8from  multiprocessing import Poolimport timedef Foo(i):    time.sleep(2)    return i + 100def Bar(arg):    print argif __name__ == '__main__':    t_start=time.time()    pool = Pool(5)    for i in range(10):        pool.apply_async(func=Foo, args=(i,), callback=Bar)#维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去    pool.close()    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。    pool.terminate()    t_end=time.time()    t=t_end-t_start    print 'the program time is :%s' %t
异步进程池
101100102103104106105107108109the program time is :4.22099995613Process finished with exit code 0
运行结果

例子二(同步进程池):

#!/usr/bin/env python# -*- coding:utf-8 -*-from  multiprocessing import Process, Poolimport timedef Foo(i):    time.sleep(1)    print i + 100if __name__ == '__main__':    t_start=time.time()    pool = Pool(5)    for i in range(10):        pool.apply(Foo, (i,))    pool.close()    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。    t_end=time.time()    t=t_end-t_start    print 'the program time is :%s' %t
同步进程池
100101102103104105106107108109the program time is :10.2409999371Process finished with exit code 0可以看出进程同步顺序执行了,效率降低
运行结果

例子三:异步进程池使用get()方法获得进程执行结果值(错误使用get()方法获取结果)

def Bar(arg):    return argif __name__ == '__main__':    t_start=time.time()    pool = Pool(5)    for i in range(10):        res = pool.apply_async(func=Foo, args=(i,), callback=Bar)#维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去        print res.get()    pool.close()    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。    pool.terminate()    t_end=time.time()    t=t_end-t_start    print 'the program time is :%s' %t
错误使用get()使得异步变同步
100101102103104105106107108109the program time is :20.2850000858Process finished with exit code 0可以看出由于每个进程的get()方法,程序变成同步执行了
运行结果

例子四(正确使用get()方法获取结果)

# coding:utf-8from  multiprocessing import Poolimport timedef Foo(i):    time.sleep(2)    return i + 100def Bar(arg):    return argif __name__ == '__main__':    res_list=[]    t_start=time.time()    pool = Pool(5)    for i in range(10):        res = pool.apply_async(func=Foo, args=(i,), callback=Bar)        res_list.append(res)    pool.close()    pool.join()    for res in res_list:        print res.get()    t_end=time.time()    t=t_end-t_start    print 'the program time is :%s' %t
正确使用get()方法
100101102103104105106107108109the program time is :4.22399997711Process finished with exit code 0
View Code

 

进程数据共享


 

进程各自持有一份数据,默认无法共享数据

#!/usr/bin/env python# coding:utf-8from multiprocessing import Processli = []def foo(i):    li.append(i)    print 'say hi', liif __name__ == '__main__':    for i in range(10):        p = Process(target=foo, args=(i,))        p.start()    print 'ending', li#期望输出[0到10的随机排列的列表]
进程间无法共享内存数据
say hi [1]say hi [0]say hi [2]say hi [3]say hi [4]say hi [5]ending []say hi [6]say hi [7]say hi [8]say hi [9]Process finished with exit code 0
运行结果

方法一(使用Array):

Array(‘i’, range(10))中的‘i’参数C语言中的类型:

‘c’: ctypes.c_char     ‘u’: ctypes.c_wchar    ‘b’: ctypes.c_byte     ‘B’: ctypes.c_ubyte‘h’: ctypes.c_short     ‘H’: ctypes.c_ushort    ‘i’: ctypes.c_int      ‘I’: ctypes.c_uint‘l’: ctypes.c_long,    ‘L’: ctypes.c_ulong    ‘f’: ctypes.c_float    ‘d’: ctypes.c_double

 

from multiprocessing import Process, Arraydef f(a):    for i in range(len(a)):        a[i] = -a[i]if __name__ == '__main__':    arr = Array('i', range(10))    p = Process(target=f, args=(arr,))    p.start()    p.join()    print(arr[:])
Array共享数据
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
运行结果

方法二(使用Manager):

Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array类型的支持。

from multiprocessing import Process, Managerdef f(d, l):    d[1] = '1'    d['2'] = 2    d[0.25] = None    l.reverse()if __name__ == '__main__':    with Manager() as manager:        d = manager.dict()        l = manager.list(range(10))        p = Process(target=f, args=(d, l))        p.start()        p.join()        print(d)        print(l)
使用manager的dict和list
{0.25: None, 1: '1', '2': 2}[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]Process finished with exit code 0
运行结果

 

转载于:https://www.cnblogs.com/tkqasn/p/5701230.html

你可能感兴趣的文章
我的友情链接
查看>>
最大连续和 Medium
查看>>
1030.在线视频—开源网管Cacti系列讲座(五)Cacti插件架构与插件安装
查看>>
Linux中exec命令相关
查看>>
asp.net mvc 如何调用微信jssdk接口:分享到微信朋友(圈)| 分享到qq空间
查看>>
Redis主从配置
查看>>
全面掌握ping命令(四)ping命令常用参数
查看>>
【C语言】编写一个程序统计输入字符串中: 各个数字、空白字符、以及其他所有字符出现的次数。...
查看>>
mysql用户管理、常用sql语句、mysql数据库备份恢复
查看>>
五大常用算法
查看>>
说说这些年做的云计算和大数据项目
查看>>
java基础第十二天_集合
查看>>
最新一代企业管理软件功能介绍
查看>>
12 种编程语言的起源故事
查看>>
Linux crond 定时任务
查看>>
linux 文件删除详解
查看>>
linux下虚拟终端terminator安装和使用
查看>>
Java多线程学习(六)Lock锁的使用
查看>>
java异常处理的机制 java 架构师学习 java北京
查看>>
史上最全Python数据类型详解
查看>>