《老鸟python 系列》视频上线了,全网稀缺资源,涵盖python人工智能教程,爬虫教程,web教程,数据分析教程以及界面库和服务器教程,以及各个方向的主流实用项目,手把手带你从零开始进阶高手之路!点击 链接 查看详情




多进程

阅读:232569559    分享到

之前我们已经介绍什么是多进程,本节我们教大家如何使用多进程,我们知道进程是运行的程序,我们运行写好的 python 程序,操作系统就给我们启动一个进程,这个进程叫主进程,然后我们可以在主进程中创多个子进程,那么多个进程就可以实现并发效果,也就是说,当我们的程序中存在多个进程的时候,在某些时候,就会让程序的执行速度变快。以我们之前所学的知识,并不能实现创建进程这个功能,所以我们就需要借助 python 中强大的模块。

Python 中的 multiprocess 提供了 Process 类,使用该类我们可以创建子进程。仔细说来,multiprocess 不是一个模块而是 python 中一个操作、管理进程的包。之所以叫 multi 是取自 multiple 的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,我将这部分大致分为四个部分:创建进程部分,进程之间数据共享部分,进程同步部分,进程池部分。

创建进程

如何创建多进程,首先我们导入 multiprocessing,然后使用 multiprocessingProcess 类创建子进程,注意 Process 的第一个字母要大写。

# 当前文件名称为 ptest.py
from multiprocessing import Process
import os
import time

def func():
    print("子进程id :", os.getpid())       # 打印子进程 id
    print("所属父进程id :", os.getppid())  # 打印子进程的父进程的 id
    print("Hello 老鸟Python,我是被子进程启动的函数。")
    time.sleep(3)  # 等待三秒再结束子进程

'''
windows 系统下必须要写 if __name__ == '__main__,由于 Windows 没有 fork,
多处理模块启动一个新的 Python 进程并导入调用模块。
如果在导入时调用 Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。
这是隐藏对 Process() 内部调用的原,使用 if __name__ == "__main __",
这个 if 语句中的语句将不会在导入时被调用。其它系统下写不写都可以。
首先我运行当前这个 ptest.py 文件,运行这个文件的程序,那么就产生了进程,这个进程我们称为主进程
'''
if __name__ == '__main__':

    '''
    将函数注册到一个子进程中,p 是一个进程对象,此时还没有启动进程,
    只是创建了一个进程对象。注意,func 是函数名,不要加括号。
    '''
    p = Process(target=func,)

    '''
    告诉操作系统,给我开启一个进程,func 这个函数就被我们新开的这个进程执行了,
    而这个进程是我主进程运行过程中创建出来的进程。
    所以称这个新创建的进程为主进程的子进程,而主进程又可以称为这个新进程的父进程。
    而这个子进程中执行的程序,相当于将现在这个程序代码文件中的程序 copy 到一个你
    看不到的 python 文件中去执行了。就相当于当前这个文件,被另外一个 python 文件
    import 过去并执行了。start 并不是直接就去执行了,我们知道进程有三个状态,进程
    会进入进程的三个状态,就绪,(被调度,也就是时间片切换到它的时候)执行,阻塞,
    并且在这个三个状态之间不断的转换,等待 cpu 执行时间片到了。
    '''
    p.start()

    # 这是主进程的程序,上面开启的子进程的程序是和主进程的程序同时运行的,我们称为并发
    print("我是主进程执行的代码")
    print("主进程id :", os.getpid())  # 打印主进程的 id

运行结果:

我是主进程执行的代码
主进程id : 8772
子进程id : 9924
所属父进程id : 8772
Hello 老鸟Python,我是被子进程启动的函数。

大家发现我上面的例子,在子进程调用的函数中最后一行代码,我让子进程等待了 3 秒再结束,但是,此时的主进程已经运行结束了。这种主进程结束,子进程还在执行的进程我们叫做僵尸进程。

为了防止僵尸进程的出现,我们在主进程代码中加上 join 的地方等待(也就是阻塞住),等待子进程执行完之后,再继续往下执行我的主进程,好多时候,我们主进程需要子进程的执行结果,所以必须要等待。join 感觉就像是将子进程和主进程拼接起来一样,将异步改为同步执行。

from multiprocessing import Process
import time

def func(name):
    print(name)
    time.sleep(3)

if __name__ == '__main__':
    p = Process(target=func, args=("老鸟python",))  # args 是传入到 func 的参数,注意 args 必须为元组。
    p.start()
    print("我是主进程执行的语句!")
    p.join()  # 该语句会阻塞 p 启动的子进程,直到子进程执行结束,才继续往下执行代码
    print("父进程执行结束!")

运行结果:

我是主进程执行的语句!
老鸟python
父进程执行结束!

对于上面的代码,初学者会误以为 func 函数是子进程函数,这种认知是错误的,函数本身一开始是不属于任何进程。我只说一句,函数由哪个进程调用,该函数的代码就是属于哪个进程的。

from multiprocessing import Process
import os

def func(name):
    print("调用我的进程名字:%s;进程id是:%s" % (name, os.getpid()))

if __name__ == '__main__':
    p1 = Process(target=func, args=("子进程1",))  # args 是传入到 func 的参数,注意 args 必须为元组。
    p2 = Process(target=func, args=("子进程2",))  # args 是传入到 func 的参数,注意 args 必须为元组。

    p1.start()  # 子进程 1 启动 func 函数
    p2.start()  # 子进程 2 启动 func 函数

    p1.join()
    p2.join()
    func("主进程")   # 主进程启动 func 函数

运行结果:

调用我的进程名字:子进程1;进程id是:548
调用我的进程名字:子进程2;进程id是:7784
调用我的进程名字:主进程;进程id是:9064

我们还可以通过类继承的方法来实现多进程

from multiprocessing import Process
import os

'''
自己写一个类,继承 Process 类,我们通过 init 方法可以传参数,如果只写一个 run 方法,
那么没法传参数,因为创建对象的是传参就是在 init 方法里面,这是面向对象的基础知识
'''
class MyProcess(Process):
    def __init__(self, person):
        super().__init__()
        self.person = person

    def run(self):
        print(os.getpid())
        print(self.pid)
        print('我是谁:%s' % self.person)

if __name__ == '__main__':
    p1 = MyProcess("老鸟python")
    p2 = MyProcess("如花")

    p1.start()  # start 内部会自动调用 run 方法
    p2.start()

    p1.join()
    p2.join()
    print("主进程结束")

运行结果:

6344
6344
我是谁:老鸟python
10616
10616
我是谁:如花
主进程结束

进程间的数据共享

每个子进程的数据都是由父进程提供的,每启动一个子进程就从父进程克隆一份数据。

创建一个进程需要非常大的开销,每个进程都有自己独立的数据空间,不同进程之间通常是不能共享数据的,要想共享数据,一般通过中间件来实现。

下面我们尝试用一个全局列表来实现进程间的数据共享:

from multiprocessing import Process

shareddata = []

def func(name):
    shareddata.append(name)
    print("我是:%s,shareddata的值是:%s,shareddata的地址是:%s" % (name, shareddata, id(shareddata)))

if __name__ == '__main__':
    p1 = Process(target=func, args=("进程1",))
    p2 = Process(target=func, args=("进程2",))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print("我是:主进程,shareddata的值是:", shareddata)

运行结果:

我是:进程2,shareddata的值是:['进程2'],shareddata的地址是:42709960
我是:进程1,shareddata的值是:['进程1'],shareddata的地址是:12039192
我是:主进程,shareddata的值是: []

大家运行一下代码,就可以知道,全局列表 shareddata 没有起到任何作用,在主进程和子进程中,shareddata指向内存中不同的列表。

想要在进程之间进行数据共享可以使用 QueuesArrayManager 这三个 multiprocess 模块提供的类。

使用 Array 共享数据

对于 Array 数组类,括号内的“i”表示它内部的元素全部是 int 类型,数组内的元素可以预先指定,也可以只指定数组的长度。Array 类在实例化的时候必须指定数组的数据类型和数组的大小,比如我们指定 3 个整数,可以写成 shareddatas= Array("i", 3)。对于数据类型有下面的对应关系:

'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double

下面我们使用 Array 来实现进程间数据共享:

from multiprocessing import Process
from multiprocessing import Array

def func(data, shareddatas):
    shareddatas[0] = data
    print("子进程中sharedatas 中的值:", shareddatas[0])

if __name__ == '__main__':
    shareddatas = Array('i', 1)

    p = Process(target=func, args=(250, shareddatas))
    p.start()
    p.join()

    print("主进程中sharedatas 中的值:", shareddatas[0])

运行结果:

子进程中sharedatas 中的值: 250
主进程中sharedatas 中的值: 250

使用 Manager 共享数据

通过 Manager 类也可以实现进程间数据的共享。Manager() 返回的 manager 对象提供一个服务进程,使得其他进程可以通过代理的方式操作 Python 对象。manager 对象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array等多种格式。

from multiprocessing import Process
from multiprocessing import Manager
import os

def func(key, shareddata):
    shareddata[key] = "我是进程" + str(os.getpid())
    print(shareddata)

if __name__ == '__main__':
    shareddata = Manager().dict()

    p1 = Process(target=func, args=("老鸟python", shareddata))
    p1.start()

    p2 = Process(target=func, args=("ruhua", shareddata))
    p2.start()

    p1.join()
    p2.join()

    shareddata["贾跃亭"] = "我是进程" + str(os.getpid())
    print(shareddata)

运行结果:

{'ruhua': '我是进程8592'}
{'ruhua': '我是进程8592', '老鸟python': '我是进程10508'}
{'ruhua': '我是进程8592', '老鸟python': '我是进程10508', '贾跃亭': '我是进程8376'}

使用 queues 的 Queue 类共享数据

multiprocessing 是一个包,它内部又一个 queues 模块,提供了一个 Queue 队列类,可以实现进程间的数据共享,如下例所示:

import multiprocessing
from multiprocessing import Process
from multiprocessing import queues

def func(data, shareddatas):
    shareddatas.put(data)

if __name__ == '__main__':
    shareddata = queues.Queue(1, ctx=multiprocessing)  # 指定队列只能放一个元素

    p = Process(target=func, args=("老鸟python", shareddata))
    p.start()
    p.join()

    print(shareddata.get())  # 取出来刚刚从子进程放进队里的数据,注意,此时队列里就没有数据了

运行结果:

老鸟python

关于队列(queues),上面我们指定队列里面只能放入一个元素,如果超过一个,再次往队列里面 put 元素时则会阻塞,直到队列的元素被取出,才能放进去。

Python 库中还有一个 Queue,Queue 和 queues 很容易就搞混淆了。甚至是 multiprocessing 自己还有一个 Queue 类(大写的Q),一样能实现 queues.Queue 的功能,导入方式是 from multiprocessing import Queue

进程同步

由于多进程是并发执行的,如果多个进程“同时”写某个共享变量,就有可能出现的问题,我们可以设置进程锁来防止多个进程同时写,也就是说把写操作给锁起来,同时只能有一个进程进入这个写操作。在 multiprocessing 里也有同名的锁类 RLockLockEventConditionSemaphore,连用法都是一样样的,这一点非常友好。

from multiprocessing import Process
from multiprocessing import Array
from multiprocessing import RLock, Lock, Event, Condition, Semaphore
import time
import os

def func(data, shareddatas, lc):
    lc.acquire()
    shareddatas[0] = shareddatas[0] + data
    time.sleep(2)
    print("进程%s修改 shareddatas 里面的数据为:%s" % (os.getpid(), shareddatas[0]))
    lc.release()


if __name__ == '__main__':
    shareddatas = Array('i', 1)  # 定义 shareddatas 里面只能放一个元素
    lock = RLock()
    shareddatas[0] = 1
    p1 = Process(target=func, args=(10, shareddatas, lock))
    p2 = Process(target=func, args=(20, shareddatas, lock))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

运行结果:

进程9444修改 shareddatas 里面的数据为:11
进程8452修改 shareddatas 里面的数据为:31

进程池 Pool 类

进程启动的开销比较大,过多的创建新进程不但会消耗大量的内存空间(操作系统对每个进程要分配 4GB 的内存空间),而且还会浪费时间(因为 CPU 要来回切换轮询进程)。我们可以使用进程池控制内存开销。

比较幸运的是,Python 给我们内置了一个进程池,不需要像线程池那样要自己写,你只需要简单的 from multiprocessing import Pool 导入就行。进程池内部维护了一个进程序列,需要时就去进程池中拿取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中常用的方法:

apply() 同步执行(串行)

apply_async() 异步执行(并行)

terminate() 立刻关闭进程池

join() 主进程等待所有子进程执行完毕。必须在close或terminate()之后。

close() 等待所有进程结束后,才关闭进程池。

from multiprocessing import Pool
import time

def func(args):
    time.sleep(1)
    print("正在执行进程 ", args)

if __name__ == '__main__':
    p = Pool(10)     # 创建一个包含 10 个进程的进程池
    for num in range(100):  # 创建 100 个进程,实际上这些进程都在进程池内
        p.apply_async(func=func, args=(num,))

    p.close()         # 等子进程执行完毕后关闭进程池
    p.join()

本节重要知识点

会使用多进程。

会多进程通信。

作业

写两个进程,一个进程做读取一个文件,并计算文件所占的字节大小,把结果发给另一个进程。


如果以上内容对您有帮助,请老板用微信扫一下赞赏码,赞赏后加微信号 birdpython 领取免费视频。


登录后评论

user_image
火鸡味锅巴
2020年9月9日 13:53 回复

那在python里面我该如何使用多进程,同时考虑并发锁的问题?


user_image
lookas2001
2020年5月8日 21:17 回复

多进程编程难度应该更复杂一点吧 比如如何共享数据 我还没看到多进程那一块


user_image
貔卡貅
2019年10月22日 07:30 回复

太赞了


user_image
喝咖啡不会醉
2019年2月25日 22:55 回复

为啥cpu密集型适合用多进程,而io密集型适合用多线程? 因为cpu密集型说明各个任务都需要cpu, 反复切换没有意义,即并发意义不大。而多进程在多核cpu上是可以并行的,注意是并行。所以能加速。 而 io密集型说明任务经常会阻塞等待不怎么需要cpu, 所以即使单核上做并发,即使每个时刻只运行一个进程,依然能节省某个任务的等待时间,让cpu忙起来。所以能提速。


user_image
crystal
2019年2月24日 23:52 回复

多进程共享数据有 shared memory,还有 Value,Array,管理器 Manager等