Python 多进程
处于跨平台考虑,仅记录multiprocessing 库的用法
因为Python 中的多线程在CPU 密集型的任务中会变成线性的,并不能充分发挥多核处理器的优势。所以我们可以通过多进程来进行并行任务,并且我们还可以通过管道、共享内存、进程锁等机制来实现进程间的数据共享。
创建一个子进程
首先我们通过multiprocessing.Process
创建一个子进程,但是在multiprocessing
模块中Process
和process
是不同的东西,并且在Windows 平台上使用时必须要用if __name__ == '__main__':
,尤其需要注意。
import os
from multiprocessing import Process, current_process
def task(param):
info = """
进程名:%s
进程Id:%s
模块名:%s
父进程Id:%s
传入参数:%s
""" % (current_process().name, os.getpid(), __name__, os.getppid(), param)
print(info)
if __name__ == '__main__':
t = Process(target=task, args=("233",))
t.start()
t.join()
共享数据
因为每个进程都有自己的数据空间,在Python 脚本中声明的全局变量也是不能共享的:
import os
from multiprocessing import Process, current_process
arr = []
def task(i):
print("In process %s, the arr's address is: %s"%(i, id(arr)))
if __name__ == '__main__':
for i in range(2):
t = Process(target=task, args=(i,))
t.start()
t.join()
# 输出:
# In process 0, the arr's address is: 2172256009216
# In process 1, the arr's address is: 2485094464512
如果要在进程间共享数据,可以使用multiprocessing
提供的Queues
、Array
和Manager
三个类。
Array
Array 类在初始化时必须指定类型与长度,也可以选择添加内容。如:arr = Array('i',5)
。下面是数据类型的定义:
'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double
需要注意的是:Array 对象必须作为参数传入子进程,否则也是不起作用的:
import os
from multiprocessing import Array, Process, current_process
def task(i, arr):
arr[i] = i
print("In process %s, the arr's address is: %s" % (i, id(arr)))
if __name__ == '__main__':
arr = Array('i', [5, 5, 5, 5, 5]) # 因为Array 需要作为参数传入子进程,所以写在里面更清晰些
for i in range(2):
t = Process(target=task, args=(i, arr,))
t.start()
t.join()
print(arr[0], arr[1], arr[2])
# 输出:
# In process 0, the arr's address is: 2163785163584 # 虽然地址不同,但内容是一样的
# In process 1, the arr's address is: 1995314455360
# 0 1 5
Manager
相比于Array,Manager 提供一个服务进程,其他进程可以通过代理的方式操作Python 对象,并且其支持的对象也更多:
import os
from multiprocessing import Array, Manager, Process, current_process
def task(i, dic):
dic[i] = i
print("In process %s, the dic's address is: %s" % (i, id(dic)))
if __name__ == '__main__':
dic = Manager().dict() # 同样,Manager 需要作为参数传入子进程
for i in range(2):
t = Process(target=task, args=(i, dic,))
t.start()
t.join()
print(dic)
# 输出:
# In process 0, the dic's address is: 2803704884720
# In process 1, the dic's address is: 2914245176816
# {0: 0, 1: 1}
Queue
Queue 队列类似于管道的概念,多个进程可以同时往里面放数据和取数据:
import os
import multiprocessing as mp
from multiprocessing import Process, Queue,queues
def task(i, q: Queue):
res = q.get(block=True, timeout=3)
print("In process %s, get: %s" % (i, res))
if __name__ == '__main__':
q = Queue(maxsize=5) # 同样,Queue 需要作为参数传入子进程
# # 等价于
# q = queues.Queue(maxsize=5, ctx=mp.get_context())
# # 关于队列上下文的问题,参考:
# # 1. https://stackoverflow.com/a/24941654/14791867
# # 2. https://docs.python.org/3.4/library/multiprocessing.html#contexts-and-start-methods
# # 简单理解就是要告诉multiprocessing 通过哪种方法创建子进程
q.put(5)
for i in range(2):
t = Process(target=task, args=(i, q,))
t.start()
# t.join()
sleep(0.5)
q.put(6)
print("233")
# 输出:
# In process 1, get: 5
# 233
# In process 0, get: 6
可以看到多个进程排队从一个队列里面取数据,取出以后再放进去一个给其他进程用。如果进程get
不到数据,则会一直等待。而且进程获取数据的顺序也是随机的。
Pipe 管道
对于两个进程间的通信来说,我们更常用管道来作为载体,使用完后注意关闭管道:
from multiprocessing.connection import _ConnectionBase
from multiprocessing import Pipe, Process
from time import sleep
def task(i, cEnd:_ConnectionBase):
res = cEnd.recv()
print("In process %s, get: %s" % (i, res))
cEnd.send("pong")
cEnd.close()
if __name__ == '__main__':
(pEnd, cEnd) =Pipe(duplex=True) # 同样,Pipe 需要作为参数传入子进程
# 如果duplex=False,则pEnd 只能接收,cEnd 只能发送
pEnd.send("ping")
t = Process(target=task, args=(1, cEnd,))
t.start()
print("In main process, get: %s" % (pEnd.recv()))
print("233")
sleep(0.5)
pEnd.close()
# 输出:
# In process 1, get: ping
# In main process, get: pong
# 233
进程锁
同多线程一样,进程间为了避免数据竞争或者脏数据的问题,页需要通过进程锁来保持数据同步:
# 以下代码摘自:https://www.liujiangblog.com/course/python/82
from multiprocessing import Process
from multiprocessing import Array
from multiprocessing import RLock, Lock, Event, Condition, Semaphore
import time
def func(i,lis,lc):
lc.acquire()
lis[0] = lis[0] - 1
time.sleep(1)
print('say hi', lis[0])
lc.release()
if __name__ == "__main__":
array = Array('i', 1)
array[0] = 10
lock = RLock()
for i in range(10):
p = Process(target=func, args=(i, array, lock))
p.start()
# 输出:
# say hi 9
# say hi 8
# say hi 7
# say hi 6
# say hi 5
# say hi 4
# say hi 3
# say hi 2
# say hi 1
# say hi 0
进程池
创建进程的开销很大,如果需要同时启动很多进程,可以考虑使用进程池:
from multiprocessing import Pool
from time import sleep
def task(i):
print("In process %s, sleep: %ss" % (i, 7-i))
sleep(7-i)
if __name__ == '__main__':
p = Pool(5) # 创建一个包含5个进程的进程池
for i in range(7):
p.apply_async(func=task, args=(i,)) # 异步执行,并行
# p.apply(func=task, args=(i,)) # 同步执行,串行
p.close() # 等所有进程结束后关闭进程池
# p.terminate() # 立即关闭进程池
p.join() # 主进程等待进程池关闭后才退出
# 输出:
# In process 0, sleep: 7s
# In process 1, sleep: 6s
# In process 2, sleep: 5s
# In process 3, sleep: 4s
# In process 4, sleep: 3s
# In process 5, sleep: 2s
# In process 6, sleep: 1s
以上便是多进程同步的所有内容了,就自己的使用经验来看:应当尽量选择管道、队列,其次在选择进程锁、共享数据,一是写起来简单、再者不容易出错。