之前我们已经介绍什么是多进程,本节我们教大家如何使用多进程,我们知道进程是运行的程序,我们运行写好的 python 程序,操作系统就给我们启动一个进程,这个进程叫主进程,然后我们可以在主进程中创多个子进程,那么多个进程就可以实现并发效果,也就是说,当我们的程序中存在多个进程的时候,在某些时候,就会让程序的执行速度变快。以我们之前所学的知识,并不能实现创建进程这个功能,所以我们就需要借助 python 中强大的模块。
Python 中的 multiprocess 提供了 Process 类,使用该类我们可以创建子进程。仔细说来,multiprocess 不是一个模块而是 python 中一个操作、管理进程的包。之所以叫 multi 是取自 multiple 的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,我将这部分大致分为四个部分:创建进程部分,进程之间数据共享部分,进程同步部分,进程池部分。
如何创建多进程,首先我们导入 multiprocessing
,然后使用 multiprocessing
的 Process
类创建子进程,注意
Process
的第一个字母要大写。
# 当前文件名称为 ptest.py from multiprocessing import Process import os import time def func(): print("子进程id :", os.getpid()) # 打印子进程 id print("所属父进程id :", os.getppid()) # 打印子进程的父进程的 id print("Hello 老鸟Python,我是被子进程启动的函数。") time.sleep(3) # 等待三秒再结束子进程 ''' windows 系统下必须要写 if __name__ == '__main__,由于 Windows 没有 fork, 多处理模块启动一个新的 Python 进程并导入调用模块。 如果在导入时调用 Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。 这是隐藏对 Process() 内部调用的原,使用 if __name__ == "__main __", 这个 if 语句中的语句将不会在导入时被调用。其它系统下写不写都可以。 首先我运行当前这个 ptest.py 文件,运行这个文件的程序,那么就产生了进程,这个进程我们称为主进程 ''' if __name__ == '__main__': ''' 将函数注册到一个子进程中,p 是一个进程对象,此时还没有启动进程, 只是创建了一个进程对象。注意,func 是函数名,不要加括号。 ''' p = Process(target=func,) ''' 告诉操作系统,给我开启一个进程,func 这个函数就被我们新开的这个进程执行了, 而这个进程是我主进程运行过程中创建出来的进程。 所以称这个新创建的进程为主进程的子进程,而主进程又可以称为这个新进程的父进程。 而这个子进程中执行的程序,相当于将现在这个程序代码文件中的程序 copy 到一个你 看不到的 python 文件中去执行了。就相当于当前这个文件,被另外一个 python 文件 import 过去并执行了。start 并不是直接就去执行了,我们知道进程有三个状态,进程 会进入进程的三个状态,就绪,(被调度,也就是时间片切换到它的时候)执行,阻塞, 并且在这个三个状态之间不断的转换,等待 cpu 执行时间片到了。 ''' p.start() # 这是主进程的程序,上面开启的子进程的程序是和主进程的程序同时运行的,我们称为并发 print("我是主进程执行的代码") print("主进程id :", os.getpid()) # 打印主进程的 id
运行结果:
我是主进程执行的代码 主进程id : 8772 子进程id : 9924 所属父进程id : 8772 Hello 老鸟Python,我是被子进程启动的函数。
大家发现我上面的例子,在子进程调用的函数中最后一行代码,我让子进程等待了 3 秒再结束,但是,此时的主进程已经运行结束了。这种主进程结束,子进程还在执行的进程我们叫做僵尸进程。
为了防止僵尸进程的出现,我们在主进程代码中加上 join 的地方等待(也就是阻塞住),等待子进程执行完之后,再继续往下执行我的主进程,好多时候,我们主进程需要子进程的执行结果,所以必须要等待。join 感觉就像是将子进程和主进程拼接起来一样,将异步改为同步执行。
from multiprocessing import Process import time def func(name): print(name) time.sleep(3) if __name__ == '__main__': p = Process(target=func, args=("老鸟python",)) # args 是传入到 func 的参数,注意 args 必须为元组。 p.start() print("我是主进程执行的语句!") p.join() # 该语句会阻塞 p 启动的子进程,直到子进程执行结束,才继续往下执行代码 print("父进程执行结束!")
运行结果:
我是主进程执行的语句! 老鸟python 父进程执行结束!
对于上面的代码,初学者会误以为 func 函数是子进程函数,这种认知是错误的,函数本身一开始是不属于任何进程。我只说一句,函数由哪个进程调用,该函数的代码就是属于哪个进程的。
from multiprocessing import Process import os def func(name): print("调用我的进程名字:%s;进程id是:%s" % (name, os.getpid())) if __name__ == '__main__': p1 = Process(target=func, args=("子进程1",)) # args 是传入到 func 的参数,注意 args 必须为元组。 p2 = Process(target=func, args=("子进程2",)) # args 是传入到 func 的参数,注意 args 必须为元组。 p1.start() # 子进程 1 启动 func 函数 p2.start() # 子进程 2 启动 func 函数 p1.join() p2.join() func("主进程") # 主进程启动 func 函数
运行结果:
调用我的进程名字:子进程1;进程id是:548 调用我的进程名字:子进程2;进程id是:7784 调用我的进程名字:主进程;进程id是:9064
我们还可以通过类继承的方法来实现多进程
from multiprocessing import Process import os ''' 自己写一个类,继承 Process 类,我们通过 init 方法可以传参数,如果只写一个 run 方法, 那么没法传参数,因为创建对象的是传参就是在 init 方法里面,这是面向对象的基础知识 ''' class MyProcess(Process): def __init__(self, person): super().__init__() self.person = person def run(self): print(os.getpid()) print(self.pid) print('我是谁:%s' % self.person) if __name__ == '__main__': p1 = MyProcess("老鸟python") p2 = MyProcess("如花") p1.start() # start 内部会自动调用 run 方法 p2.start() p1.join() p2.join() print("主进程结束")
运行结果:
6344 6344 我是谁:老鸟python 10616 10616 我是谁:如花 主进程结束
每个子进程的数据都是由父进程提供的,每启动一个子进程就从父进程克隆一份数据。
创建一个进程需要非常大的开销,每个进程都有自己独立的数据空间,不同进程之间通常是不能共享数据的,要想共享数据,一般通过中间件来实现。
下面我们尝试用一个全局列表来实现进程间的数据共享:
from multiprocessing import Process shareddata = [] def func(name): shareddata.append(name) print("我是:%s,shareddata的值是:%s,shareddata的地址是:%s" % (name, shareddata, id(shareddata))) if __name__ == '__main__': p1 = Process(target=func, args=("进程1",)) p2 = Process(target=func, args=("进程2",)) p1.start() p2.start() p1.join() p2.join() print("我是:主进程,shareddata的值是:", shareddata)
运行结果:
我是:进程2,shareddata的值是:['进程2'],shareddata的地址是:42709960 我是:进程1,shareddata的值是:['进程1'],shareddata的地址是:12039192 我是:主进程,shareddata的值是: []
大家运行一下代码,就可以知道,全局列表 shareddata 没有起到任何作用,在主进程和子进程中,shareddata指向内存中不同的列表。
想要在进程之间进行数据共享可以使用 Queues
、Array
和 Manager
这三个 multiprocess 模块提供的类。
使用 Array 共享数据
对于 Array 数组类,括号内的“i”表示它内部的元素全部是 int 类型,数组内的元素可以预先指定,也可以只指定数组的长度。Array
类在实例化的时候必须指定数组的数据类型和数组的大小,比如我们指定 3 个整数,可以写成 shareddatas= Array("i", 3)
。对于数据类型有下面的对应关系:
'c': ctypes.c_char, 'u': ctypes.c_wchar, 'b': ctypes.c_byte, 'B': ctypes.c_ubyte, 'h': ctypes.c_short, 'H': ctypes.c_ushort, 'i': ctypes.c_int, 'I': ctypes.c_uint, 'l': ctypes.c_long, 'L': ctypes.c_ulong, 'f': ctypes.c_float, 'd': ctypes.c_double
下面我们使用 Array 来实现进程间数据共享:
from multiprocessing import Process from multiprocessing import Array def func(data, shareddatas): shareddatas[0] = data print("子进程中sharedatas 中的值:", shareddatas[0]) if __name__ == '__main__': shareddatas = Array('i', 1) p = Process(target=func, args=(250, shareddatas)) p.start() p.join() print("主进程中sharedatas 中的值:", shareddatas[0])
运行结果:
子进程中sharedatas 中的值: 250 主进程中sharedatas 中的值: 250
使用 Manager 共享数据
通过 Manager 类也可以实现进程间数据的共享。Manager()
返回的 manager
对象提供一个服务进程,使得其他进程可以通过代理的方式操作 Python 对象。manager 对象支持
list
, dict
, Namespace
, Lock
, RLock
,
Semaphore
, BoundedSemaphore
, Condition
, Event
,
Barrier
, Queue
, Value
,Array
等多种格式。
from multiprocessing import Process from multiprocessing import Manager import os def func(key, shareddata): shareddata[key] = "我是进程" + str(os.getpid()) print(shareddata) if __name__ == '__main__': shareddata = Manager().dict() p1 = Process(target=func, args=("老鸟python", shareddata)) p1.start() p2 = Process(target=func, args=("ruhua", shareddata)) p2.start() p1.join() p2.join() shareddata["贾跃亭"] = "我是进程" + str(os.getpid()) print(shareddata)
运行结果:
{'ruhua': '我是进程8592'} {'ruhua': '我是进程8592', '老鸟python': '我是进程10508'} {'ruhua': '我是进程8592', '老鸟python': '我是进程10508', '贾跃亭': '我是进程8376'}
使用 queues 的 Queue 类共享数据
multiprocessing 是一个包,它内部又一个 queues 模块,提供了一个 Queue 队列类,可以实现进程间的数据共享,如下例所示:
import multiprocessing from multiprocessing import Process from multiprocessing import queues def func(data, shareddatas): shareddatas.put(data) if __name__ == '__main__': shareddata = queues.Queue(1, ctx=multiprocessing) # 指定队列只能放一个元素 p = Process(target=func, args=("老鸟python", shareddata)) p.start() p.join() print(shareddata.get()) # 取出来刚刚从子进程放进队里的数据,注意,此时队列里就没有数据了
运行结果:
老鸟python
关于队列(queues),上面我们指定队列里面只能放入一个元素,如果超过一个,再次往队列里面 put 元素时则会阻塞,直到队列的元素被取出,才能放进去。
Python 库中还有一个 Queue,Queue 和 queues 很容易就搞混淆了。甚至是 multiprocessing 自己还有一个 Queue 类(大写的Q),一样能实现 queues.Queue
的功能,导入方式是 from
multiprocessing import Queue
。
由于多进程是并发执行的,如果多个进程“同时”写某个共享变量,就有可能出现的问题,我们可以设置进程锁来防止多个进程同时写,也就是说把写操作给锁起来,同时只能有一个进程进入这个写操作。在 multiprocessing
里也有同名的锁类 RLock
,Lock
,Event
,Condition
和
Semaphore
,连用法都是一样样的,这一点非常友好。
from multiprocessing import Process from multiprocessing import Array from multiprocessing import RLock, Lock, Event, Condition, Semaphore import time import os def func(data, shareddatas, lc): lc.acquire() shareddatas[0] = shareddatas[0] + data time.sleep(2) print("进程%s修改 shareddatas 里面的数据为:%s" % (os.getpid(), shareddatas[0])) lc.release() if __name__ == '__main__': shareddatas = Array('i', 1) # 定义 shareddatas 里面只能放一个元素 lock = RLock() shareddatas[0] = 1 p1 = Process(target=func, args=(10, shareddatas, lock)) p2 = Process(target=func, args=(20, shareddatas, lock)) p1.start() p2.start() p1.join() p2.join()
运行结果:
进程9444修改 shareddatas 里面的数据为:11 进程8452修改 shareddatas 里面的数据为:31
进程启动的开销比较大,过多的创建新进程不但会消耗大量的内存空间(操作系统对每个进程要分配 4GB 的内存空间),而且还会浪费时间(因为 CPU 要来回切换轮询进程)。我们可以使用进程池控制内存开销。
比较幸运的是,Python 给我们内置了一个进程池,不需要像线程池那样要自己写,你只需要简单的 from multiprocessing import Pool
导入就行。进程池内部维护了一个进程序列,需要时就去进程池中拿取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中常用的方法:
apply() 同步执行(串行)
apply_async() 异步执行(并行)
terminate() 立刻关闭进程池
join() 主进程等待所有子进程执行完毕。必须在close或terminate()之后。
close() 等待所有进程结束后,才关闭进程池。
from multiprocessing import Pool import time def func(args): time.sleep(1) print("正在执行进程 ", args) if __name__ == '__main__': p = Pool(10) # 创建一个包含 10 个进程的进程池 for num in range(100): # 创建 100 个进程,实际上这些进程都在进程池内 p.apply_async(func=func, args=(num,)) p.close() # 等子进程执行完毕后关闭进程池 p.join()
会使用多进程。
会多进程通信。
写两个进程,一个进程做读取一个文件,并计算文件所占的字节大小,把结果发给另一个进程。
多进程编程难度应该更复杂一点吧 比如如何共享数据 我还没看到多进程那一块
太赞了
为啥cpu密集型适合用多进程,而io密集型适合用多线程? 因为cpu密集型说明各个任务都需要cpu, 反复切换没有意义,即并发意义不大。而多进程在多核cpu上是可以并行的,注意是并行。所以能加速。 而 io密集型说明任务经常会阻塞等待不怎么需要cpu, 所以即使单核上做并发,即使每个时刻只运行一个进程,依然能节省某个任务的等待时间,让cpu忙起来。所以能提速。
多进程共享数据有 shared memory,还有 Value,Array,管理器 Manager等
那在python里面我该如何使用多进程,同时考虑并发锁的问题?