Parallel Programming in Python

Parallel in Python

并行计算在Python中并不少见, 相关多线程,多进程,threading, Pool, Queue的介绍网上都有很多资料, 这里不做赘述. 下面主要从几个常见的问题出发, 了解一些概念之间的问题(Based on Python3).

哪个是我该用的Queue?

Queue是个在多进程中是个让人困惑的概念. 可以用多个方式导入Queue, 分别是 multiprocessing.queues.Queue, 和 multiprocessing中Manager中的Queue, 还有queue中的Queue

从Manager中导入的Queue

from multiprocessing import Manager
m = Manager()
q_from_manager = m.Queue()

Manager默认是multiprocessing中SyncManager类, SyncManager类中的Queue()方法则返回queue.Queue(). 但是并不是说由Manager生成的Queue 和 queue.Queue是一样的. 相反, 在Pool,Process和父进程通信的过程中, Manager.Queue可以很好的实现父子进程间通信, 子进程间通信, 而queue.Queue()并不能, 说明Manager通过继承的multiprocessing.managers.BaseManager中的机制来保障进程间通信, 而queue.Queue()只是实现这一机制的底层.

通过multiprecessing导入Queue

from multiprocessing import Queue
q = Queue(1)
print(type(q))
<class 'multiprocessing.queues.Queue'>

说明 from multiprocessing import Queue 导入的 Queue本质是 multiprocessing.queues.Queue, 但是后者的init方法为 __init__(self, maxsize=0, *, ctx), 需要更多的配置, 所以在使用上还需配置, 不推荐使用.

这样就可以把正常使用的Queue缩小到两个范围 from multiprocessing import Queue 和 Manager的Queue()方法. 下文简写为 mp.Queue 和 mgr.Queue

mp.Queue 可以用于spawn出的子进程间通信, 也可以用于 父进程和 spawn出的子进程通信,
multiprocessing.Queue只能用于显示地用Process创建的子进程间或与子进程与父进程之间通信;
multiprocessing.Manager.Queue 能用于Pool创建的子进程之间通信, 也能用于父进程和子进程通信, 也能用于Process进程与Pool创建的子进程之间通信. 这个还是比较强大的功能.
这段以后再写. 需要重新设计一遍对应的实验.

选择合适的Queue

from queue import Queue
普通的队列模式

from multiprocessing import Queue
多进程并发的Queue队列, 用于解决多进程间通信问题.

如果要用进程池, 则必须使用Manager.Queue()

问题

仍留下的问题是, 如果用Pool, 还需不需要用JoinableQueue, 或者, 该怎么结束Pool中的进程? #TODO 仍需要找一个案例, 能够很好的解释如何停止Pool中的进程.

Terminate multi process/thread in Python correctly and gracefully中有很好的如何解决Process结束的问题. 但仍有一些点需要注意.
例如, 在Problem1中提到的结束无限循环子进程的解决方案中, 使用了Process.terminate方法. 这个案例其实主要表现的是, 在子进程的join之前的所有父进程代码均会执行, 然后阻塞在join语句. 这样, 因为子进程是无限循环, 所以不会自己结束, 从而有必要在父进程中显示的调用子进程的terminate语句. 但是调用terminate的时候如果子进程的任务没有完成, 也会被立刻消灭. 这就会导致很难看的结果.

import multiprocessing
import time

def hang():
    while True:
        print('hanging..')
        time.sleep(10)
        print('after 10 , hanging...')

def main():
    p = multiprocessing.Process(target=hang)
    p.start()
    time.sleep(1)
    p.terminate()
    p.join()
    print('main process exiting..')

if __name__ == '__main__':
    main()

子进程会在父进程休眠1秒之后执行剩下的语句, 也就是terminate, 这时子进程仍处在sleep(10)中, 所以这时候被结束, 会导致第二个print语句不被执行.

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,10几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,这时候进程池Pool发挥作用的时候就到了。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。这里有一个简单的例子

#!/usr/bin/env python
#coding=utf-8
from multiprocessing import Pool
from time import sleep

def f(x):
    for i in range(10):
        print '%s --- %s ' % (i, x)
        sleep(1)


def main():
    pool = Pool(processes=3)    # set the processes max number 3
    for i in range(11,20):
        result = pool.apply_async(f, (i,))
    pool.close()
    pool.join()
    if result.successful():
        print 'successful'


if __name__ == "__main__":
    main()

先创建容量为3的进程池,然后将f(i)依次传递给它,运行脚本后利用ps aux | grep pool.py查看进程情况,会发现最多只会有三个进程执行。pool.apply_async()用来向进程池提交目标请求,pool.join()是用来等待进程池中的worker进程执行完毕,防止主进程在worker进程结束前结束。但pool.join()必须使用在pool.close()或者pool.terminate()之后。
其中close()跟terminate()的区别在于close()会等待池中的worker进程执行结束再关闭pool,而terminate()则是直接关闭。result.successful()表示整个调用执行的状态,如果还有worker没有执行完,则会抛出AssertionError异常。
利用multiprocessing下的Pool可以很方便的同时自动处理几百或者上千个并行操作,脚本的复杂性也大大降低。

没有找到合适的方法能够让Pool中的进程自动的停止. 如果没有正规的方式的话, 估计是要用ternimate语句在父进程中显示的调用, 从而停止Pool中的子进程.

如何在thread, multiprocessing subprocess中选择合适任务的并行实现方式?

经常遇到的一个问题是, 不知道该怎样选择一个合适任务的并行实现方式. 在Stack Overflow上有个很好的解答 Deciding among subprocess, multiprocessing, and thread in Python?
从这个回答中不难看出, subprocess 往往用来提供同步反馈. 可以在主进程中spawn出一个subprocess, 在子进程中完成一些任务, 然后等待子进程的完成或读取子进程的输出. 理论上来说可以spawn出100甚至更多的subprocess来完成任务, 但是最大的缺点在于subprocess的I/O block.

multiprocessing往往是python用来实现真正并行的方式. 引用回答中的话,

multiprocessing is for running functions within your existing (Python) code with support for more flexible communications among this family of processes.

而且multiprocessing还可以规避GIL的问题, 从而方便的扩展到CPU的多核上实现并行.

而threading则是一个受用面很窄的用法, 因为它天然I/O限制, 无法扩展到多核CPU上.

综上而言, 如果只是想单独的把一个任务放在主进程之外进行, 可以用subprocess; 如果是想高效并行的话, 用multiprocessing ; 少用threading(个人意见). 想知道更多的区别, 可以去答案里看.

Process类继承

from multiprocessing import Queue, Process, current_process
import os
from time import sleep
from random import randint


class Producer(Process):
    def __init__(self, queue): # Override
        Process.__init__(self) # 加入父类init
        self.queue = queue 

    def run(self): # 调用start()时会调用run(run为单进程).
        while True:
            self.queue.put('one product')
            print(current_process().name + str(
                os.getpid()) + ' produced one product, the no of queue now is: %d' % self.queue.qsize())
            sleep(randint(1, 3))


class Consumer(Process):
    def __init__(self, queue):
        Process.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            d = self.queue.get(1)
            if d != None:
                print(current_process().name + str(os.getpid()) + ' consumed  %s, the no of queue now is: %d' % (
                    d, self.queue.qsize()))
                sleep(randint(1, 4))
                continue
            else:
                break


if __name__ == "__main__":
    queue = Queue(40)
    # Init
    processed = []
    for i in range(3):
        processed.append(Producer(queue))
    processed.append(Consumer(queue))

    # Start
    for i in range(len(processed)):
        processed[i].start()

    # Join
    for i in range(len(processed)):
        processed[i].join()

Useful Links

An introduction to parallel programming using Python’s multiprocessing module
Python Multiprocessing: Pool vs Process – Comparative Analysis
孤儿进程与僵尸进程

发表评论

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据