目录
Parallel in Python
并行计算在Python中并不少见, 相关多线程,多进程,threading, Pool, Queue的介绍网上都有很多资料, 这里不做赘述. 下面主要从几个常见的问题出发, 了解一些概念之间的问题(Based on Python3).
哪个是我该用的Queue?
Queue是个在多进程中是个让人困惑的概念. 可以用多个方式导入Queue, 分别是 multiprocessing.queues.Queue, 和 multiprocessing中Manager中的Queue, 还有queue中的Queue
从Manager中导入的Queue
from multiprocessing import Manager
m = Manager()
q_from_manager = m.Queue()
Manager默认是multiprocessing中SyncManager类, SyncManager类中的Queue()方法则返回queue.Queue(). 但是并不是说由Manager生成的Queue 和 queue.Queue是一样的. 相反, 在Pool,Process和父进程通信的过程中, Manager.Queue可以很好的实现父子进程间通信, 子进程间通信, 而queue.Queue()并不能, 说明Manager通过继承的multiprocessing.managers.BaseManager中的机制来保障进程间通信, 而queue.Queue()只是实现这一机制的底层.
通过multiprecessing导入Queue
from multiprocessing import Queue
q = Queue(1)
print(type(q))
<class 'multiprocessing.queues.Queue'>
说明 from multiprocessing import Queue
导入的 Queue本质是 multiprocessing.queues.Queue, 但是后者的init方法为 __init__(self, maxsize=0, *, ctx)
, 需要更多的配置, 所以在使用上还需配置, 不推荐使用.
这样就可以把正常使用的Queue缩小到两个范围 from multiprocessing import Queue 和 Manager的Queue()方法. 下文简写为 mp.Queue 和 mgr.Queue
mp.Queue 可以用于spawn出的子进程间通信, 也可以用于 父进程和 spawn出的子进程通信,
multiprocessing.Queue只能用于显示地用Process创建的子进程间或与子进程与父进程之间通信;这段以后再写. 需要重新设计一遍对应的实验.
multiprocessing.Manager.Queue 能用于Pool创建的子进程之间通信, 也能用于父进程和子进程通信, 也能用于Process进程与Pool创建的子进程之间通信. 这个还是比较强大的功能.
选择合适的Queue
from queue import Queue
普通的队列模式
from multiprocessing import Queue
多进程并发的Queue队列, 用于解决多进程间通信问题.
如果要用进程池, 则必须使用Manager.Queue()
问题
仍留下的问题是, 如果用Pool, 还需不需要用JoinableQueue, 或者, 该怎么结束Pool中的进程? #TODO 仍需要找一个案例, 能够很好的解释如何停止Pool中的进程.
在Terminate multi process/thread in Python correctly and gracefully中有很好的如何解决Process结束的问题. 但仍有一些点需要注意.
例如, 在Problem1中提到的结束无限循环子进程的解决方案中, 使用了Process.terminate方法. 这个案例其实主要表现的是, 在子进程的join之前的所有父进程代码均会执行, 然后阻塞在join语句. 这样, 因为子进程是无限循环, 所以不会自己结束, 从而有必要在父进程中显示的调用子进程的terminate语句. 但是调用terminate的时候如果子进程的任务没有完成, 也会被立刻消灭. 这就会导致很难看的结果.
import multiprocessing
import time
def hang():
while True:
print('hanging..')
time.sleep(10)
print('after 10 , hanging...')
def main():
p = multiprocessing.Process(target=hang)
p.start()
time.sleep(1)
p.terminate()
p.join()
print('main process exiting..')
if __name__ == '__main__':
main()
子进程会在父进程休眠1秒之后执行剩下的语句, 也就是terminate, 这时子进程仍处在sleep(10)中, 所以这时候被结束, 会导致第二个print语句不被执行.
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,10几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,这时候进程池Pool发挥作用的时候就到了。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。这里有一个简单的例子
#!/usr/bin/env python
#coding=utf-8
from multiprocessing import Pool
from time import sleep
def f(x):
for i in range(10):
print '%s --- %s ' % (i, x)
sleep(1)
def main():
pool = Pool(processes=3) # set the processes max number 3
for i in range(11,20):
result = pool.apply_async(f, (i,))
pool.close()
pool.join()
if result.successful():
print 'successful'
if __name__ == "__main__":
main()
先创建容量为3的进程池,然后将f(i)依次传递给它,运行脚本后利用ps aux | grep pool.py查看进程情况,会发现最多只会有三个进程执行。pool.apply_async()用来向进程池提交目标请求,pool.join()是用来等待进程池中的worker进程执行完毕,防止主进程在worker进程结束前结束。但pool.join()必须使用在pool.close()或者pool.terminate()之后。
其中close()跟terminate()的区别在于close()会等待池中的worker进程执行结束再关闭pool,而terminate()则是直接关闭。result.successful()表示整个调用执行的状态,如果还有worker没有执行完,则会抛出AssertionError异常。
利用multiprocessing下的Pool可以很方便的同时自动处理几百或者上千个并行操作,脚本的复杂性也大大降低。
没有找到合适的方法能够让Pool中的进程自动的停止. 如果没有正规的方式的话, 估计是要用ternimate语句在父进程中显示的调用, 从而停止Pool中的子进程.
如何在thread, multiprocessing subprocess中选择合适任务的并行实现方式?
经常遇到的一个问题是, 不知道该怎样选择一个合适任务的并行实现方式. 在Stack Overflow上有个很好的解答 Deciding among subprocess, multiprocessing, and thread in Python?
从这个回答中不难看出, subprocess 往往用来提供同步反馈. 可以在主进程中spawn出一个subprocess, 在子进程中完成一些任务, 然后等待子进程的完成或读取子进程的输出. 理论上来说可以spawn出100甚至更多的subprocess来完成任务, 但是最大的缺点在于subprocess的I/O block.
multiprocessing往往是python用来实现真正并行的方式. 引用回答中的话,
multiprocessing is for running functions within your existing (Python) code with support for more flexible communications among this family of processes.
而且multiprocessing还可以规避GIL的问题, 从而方便的扩展到CPU的多核上实现并行.
而threading则是一个受用面很窄的用法, 因为它天然I/O限制, 无法扩展到多核CPU上.
综上而言, 如果只是想单独的把一个任务放在主进程之外进行, 可以用subprocess; 如果是想高效并行的话, 用multiprocessing ; 少用threading(个人意见). 想知道更多的区别, 可以去答案里看.
Process类继承
from multiprocessing import Queue, Process, current_process
import os
from time import sleep
from random import randint
class Producer(Process):
def __init__(self, queue): # Override
Process.__init__(self) # 加入父类init
self.queue = queue
def run(self): # 调用start()时会调用run(run为单进程).
while True:
self.queue.put('one product')
print(current_process().name + str(
os.getpid()) + ' produced one product, the no of queue now is: %d' % self.queue.qsize())
sleep(randint(1, 3))
class Consumer(Process):
def __init__(self, queue):
Process.__init__(self)
self.queue = queue
def run(self):
while True:
d = self.queue.get(1)
if d != None:
print(current_process().name + str(os.getpid()) + ' consumed %s, the no of queue now is: %d' % (
d, self.queue.qsize()))
sleep(randint(1, 4))
continue
else:
break
if __name__ == "__main__":
queue = Queue(40)
# Init
processed = []
for i in range(3):
processed.append(Producer(queue))
processed.append(Consumer(queue))
# Start
for i in range(len(processed)):
processed[i].start()
# Join
for i in range(len(processed)):
processed[i].join()
Useful Links
An introduction to parallel programming using Python’s multiprocessing module
Python Multiprocessing: Pool vs Process – Comparative Analysis
孤儿进程与僵尸进程