目录
多进程的含义
进程(Process)是具有一定独立功能的程序关于某给数据集合上的一次运行活动,是系统进行资源分配和调度的一个独立单位。
多进程就是启用多个进程同时运行,由于进程是线程的集合,而且进程是由一个或多个线程构成,所以多进程的运行意味着有大于或等于进程数量的线程在运行。
Python多进程的优势
根据这节线程基本原理了解到,由于进程中GIL的存在,Python中的多线程并不能很好地发挥多核优势,一个进程中的多线程,在同一时刻只能有一个线程运行。
对于多进程来说,每一个进程都有属于自己的GIL,所以,在多核处理处理器下,多进程的运行是不会受到GIL的影响的。因此,多进程能更好的发挥多核优势。
对于爬虫这种IO密集型任务来说,多线程和多进程影响的差别不大,对于计算密集型任务来说,python的多进程相比多线程,其多核运行效率会有成倍的提升。
总的来说,python的多进程整体看来是比多线程更有优势,所以在条件允许的情况下,能用多进程就尽量使用多进程。
需要注意的是,由于进程是系统进行资源分配和调度的一个独立单位,所以各个进程之间的数据是无法共享的,如多个进程无法共享一个全局变量,进程之间的数据共享需要有单独的机制来实现。
多进程的实现
在Python中有内置的库来实现多进程的,它就是multiprocessing。
multiprocessing提供了一系列的组件,如Process(进程)、Queue(队列)、Semaphore(信号量)、Pipe(管道)、Lock(锁)、Pool(进程池)等,
直接使用Process类
在multiprocessing中,每一个进程都用一个Process类来表示。它的API调用如下:
Process([group [, target [, name [, args [, kwargs]]]]])
- target 表示调用对象,你可以传入方法的名字
- args 表示被调用对象的位置参数是元组
- kwargs 表示调用对象的字典
- name 是别名,给进程起一个名字
- group 分组
【示例】如下:
import multiprocessing
def process(index):
'''
multiprocessing.current_process().name 获取进程名字
'''
print(f'index:{index}', multiprocessing.current_process().name)
if __name__ == '__main__':
for i in range(5):
# target 传入函数
# args 参数
# name 自定义进程名
p = multiprocessing.Process(target=process, args=(i,), name=str(i))
p.start()
结果自己运行看下。。。
上面代码,可以看出,循环5次,相当于我们运行了5个子进程,每一个进程调用了Process方法,Process方法的index参数通过Process的args传入,分别是0-4这个5个数字,最后打印出来,5个子进程运行结束。
由于进程是Python中最小的资源分配单元,因此这些进程和线程不同,各个进程之间的数据是不会共享的,每启动一个进程,都会独立分配资源。
另外,在CPU核数足够的情况下,这些不同的进程会分配给不同的CPU核来运行,实现真正的并行执行。
multiprocessing提供了几个比较有用的方法,如我们可以通过cpu_count的方法来获取当前机器cpu的核心数量,通过actice_children方法获取当前还在运行的所有进程。
multiprocessing.cpu_count() #返回cpu的数量
multiprocessing.active_children() #返回是列表,返回当前正在活跃运行的进程列表
继承Process类
我们也可以像创建线程Thread一样来通过继承的方式创建一个进程类,进程的基本操作我们在子类的run方法中实现即可。
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, index):
Process.__init__(self)
self.index = index
def run(self):
for i in range(self.index):
time.sleep(1)
print(f"pid:{self.pid},name:{self.name},index:{i}")
if __name__ == '__main__':
for i in range(1, 5):
p = MyProcess(i)
p.start()
跟线程继承Thread一样,首先需要声明一个构造方法,这个方法接收一个index参数,代表循环次数,并将其设置为全局变量,在run方法中,又使用这个index变量循环index次比打印了当前的进程号、进程名程以及循环次数。
在调用时,用range方法得到1-4四个数字,并把它们分别初始化了MyProcess进程,再调用start方法将进程启动起来。
为了复用方便,可以把一些方法写在每一个进程类里封装好,在使用时直接初始化一个进程类即可。
守护进程
在多进程中,同样存在守护进程的概念,如果一个进程被设置为守护进程,当父进程结束后,子进程会自动被终止,我们可以通过daemon属性来控制是否为守护进程。
用上面的例子,增加daemon属性设置,例如:(使用继承Process类下面的代码)
...
...
...
if __name__ == '__main__':
for i in range(2, 5):
p = MyProcess(i)
p.daemon=True #比上面代码,多加这一行
p.start()
运行发现,返回为空,因为主进程没有做任何事,所以直接结束,所以在这时也直接终止了子进程的运行。
加daemon的优势在于,可以有效防止无控制地生成子进程,只要主进程结束后无需担心子进程是否关闭,避免了独立子进程的运行。
进程等待
按照上面实现,其实又一个问题就是,在主进程结果后,子进程(守护进程)也都退出,子进程还没有来的急执行,就结束了。这不符合我们的预期。
解决这个问题,只需要加入join方法即可,代码修改如下:(使用继承Process类下面的代码)
...
...
...
if __name__ == '__main__':
processes=[]
for i in range(2, 5):
p = MyProcess(i)
processes.append(p)
p.daemon = True
p.start()
for j in processes:
j.join()
#j.join(1) #设置等待时间
在调用start和join方法后,父进程就可以等待所有子进程都执行完毕后,在打印结束的结果。
在默认的情况,join是无期限的,只要子进程没有运行完毕,主进程将一直在等待,如子进程陷入死循环,主进程也会一直在等待,要解决这个问题,可以给join方法传递一个超时参数,代表最长等待秒数,如果子进程没有在这个指定秒数完成,会被强制返回,主进程不会在等待,也就是说这个参数设置了主进程等待子进程的最长时间。
以上就是守护进程、进程等待和超时设置的用法。
终止进程
终止进程不止有守护进程这一种做法,也可以通过terminate方法来终止某个子进程,另外我们可以通过is_alive方法判断进程是否还在执行
【示列】如下:
import multiprocessing
import time
def processes():
print('starting')
time.sleep(2)
print('Finished')
if __name__ == '__main__':
p = multiprocessing.Process(target=processes)
print(p, p.is_alive()) #返回是False 进程还没有启动
p.start()
print(p, p.is_alive()) #返回是True 进程已经启动
p.terminate() #将进程终止
print(p,p.is_alive()) # 此处返回是True
p.join()
print(p, p.is_alive()) # 返回是False
我们用Process创建一个进程,接着调用start方法启动这个进程,然后在调用terminate方法将进程终止,最后在调用join方法。
在进程运行的不同阶段,可以通过is_alive方法判断进程是否还在运行。
在运行后发现(这里不放执行返回内容,跟后面注释差不多),发现在调用terminate方法之后,用is_alive方法获取进程状态发现依然还是运行状态,在调用join方法之后,is_alive方法获取进程的运行状态才变为终止状态。
所以,在调用 terminate 方法之后,记得要调用一下 join 方法,这里调用 join 方法可以为进程提供时间来更新对象状态,用来反映出最终的进程终止效果。
进程互斥锁
在运行上面的一些示例中,我们会发现运行后输出的结果没有换行,这是什么原因呢?
这种情况是由多个进程并行执行导致的,两个进程同时输出,结果第一个进程的换行没有来的急输出,第二个进程就输出了结果,导致最终输出没有换行。
要想解决这种问题,解决方案实际上就是实现了进程互斥,避免了多个进程同时抢占临界区(输出)资源,我们可以通过multiprocessing中的Lock来实现,Lock锁,在一个进程输出时,加锁,其他进程等待。等此进程执行结束后,释放锁,其他进程可以输出。(这里锁的用法跟线程的用法是一样的)
【示例】如下:
from multiprocessing import Process, Lock
class MyProcess(Process):
def __init__(self, index,lock):
Process.__init__(self)
self.index = index
self.lock=lock
def run(self):
for i in range(self.index):
self.lock.acquire()
print('pid:', self.pid, 'name:', self.name)
self.lock.release()
if __name__ == '__main__':
lock = Lock()
processes=[]
for i in range(1, 5):
p = MyProcess(i,lock)
processes.append(p)
p.start()
for j in processes:
j.join()
大家可以吧,锁的部分注释掉,分别执行,,就可以看到效果了。
使用Lock可以有效避免进程同时占用资源而导致的一些问题。
信号量
进程互斥锁可以使同一时刻只有一个进程能访问共享资源,如上面的例子所展示的,在同一时刻只能有一个进程输出结果。但有时候我们需要允许多个进程来访问共享资源,同时还需要限制能访问共享资源的进程的数量。
如果要实现,我们可以用信号量,信号量是进程同步过程中一个比较重要的角色。它可以控制临界资源的数量,实现多个进程同时访问共享资源,限制进程的并发量。
我们可以用multiprocessing库中的Semaphore来实现信号量。
我们实例来演示胰一下进程之间利用Semaphore做到多个进程共享资源,同时又限制同时可访问的进程数量,
注意:信号量Semaphore是一个计数器,控制对公共资源或者临界区域的访问量,每一次有一个进程获取信号量时,计数器-1,若计数器为0时,其它进程就停止访问信号量,一直堵塞直到其它进程释放信号量。
acquire() 这是请求一个信号量,信号量减一
release() 这是请求释放一个信号量,信号量加一
【示例】如下:
from multiprocessing import Process, Semaphore, Lock, Queue
import time
buffer = Queue(10) # 共享队列
empty = Semaphore(3) #设置最大信号量,empty代表缓冲区空余数量
full = Semaphore(0) #代表缓冲区占用区
lock = Lock()
# 消费者
class Consumer(Process):
def __init__(self, buffer, empty, full, lock):
Process.__init__(self)
self.buffer = buffer
self.lock = lock
self.empty = empty
self.full = full
def run(self):
while True:
self.full.acquire() #缓冲区占用区 -1
self.lock.acquire()
self.buffer.get() #队列消费
print('消费者弹出一个元素')
time.sleep(1)
self.lock.release()
self.empty.release() #缓冲区空余数 +1
print()
# 生产者
class Producer(Process):
def __init__(self, buffer, empty, full, lock):
Process.__init__(self)
self.buffer = buffer
self.lock = lock
self.empty = empty
self.full = full
def run(self):
while True:
self.empty.acquire() #此处缓冲区空余数 -1
self.lock.acquire()
self.buffer.put(1) #队列写入1
print('生产者附加一个元素')
time.sleep(1)
self.lock.release()
self.full.release() #缓冲区占用数 +1
if __name__ == '__main__':
p = Producer(buffer=buffer, full=full, empty=empty, lock=lock)
c = Consumer(buffer=buffer, full=full, empty=empty, lock=lock)
p.daemon = c.daemon = True
p.start()
c.start()
p.join()
c.join()
print('主进程结束')
执行返回,这里就不放了,如有问题,留言。。
上述代码实现了经典的生产者和消费者问题,定义了两个进程类,一个消费者,一个是生产者。
另外,上述代码使用了multiprocessing中Queue定义了一个共享队列,然后定义了两个信号量Swmaphore,一个代表缓冲区空余数,一个代表缓冲区占用数。
生产者Producer使用acquire方法来占用一个缓冲区位置,缓冲区空闲区大小减一,接下来进行加锁,对缓冲区进行操作,然后释放锁,最后让代表占用的缓冲区位置数量加一,消费者则相反。
队列
上述的例子中,我们使用了Queue作为进程通信的共享队列使用。
如果我们把上面程序中Queue换成普通的list,是完全起不到效果的。因为进程和进程之间的资源是不共享的,即使在一个进程中改变了这个list,在另一个进程也不能获取到这个list的状态,所以声明全局变量对进程是没有用处的。
在进程共享数据,可以使用Queue,即队列,当然这里的队列指的是,multiprocessing里面的Queue。
代码同上,稍微修改一下即可,(如有问题留言)
只要在生产者在放数据的时候调用了Queue的put方法,消费者在取的时候用get方法,这样就可以通过Queue实现两个进程的数据共享了。
管道
上面使用Queue实现了进程间的数据共享,那么进程之间直接通信,如收发信息,我们可以使用Pipe,管道。
管道,可以理解为两个进程之间通信的通道。管道可以是单向的,即half-duplex;一个进程负责发消息,一个进程负责收消息,也可以是双向的duplex,即互相收发消息。
默认声明Pipe对象是双向管道,如果要创建单向管道,可以在初始化的时候传入duplex参数为Flase。
注:一. Pipe方法返回(pipe[0],pipe[1])代表一个管道的两个端。Pipe方法有duplex参数,默认为True,那么管道是双全工模式,也就是pipe[0],pipe[1]都可以收发。duplex为False,pipe[0]只负责接收消息,pipe[1]只负责发送消息。
二. send和recv方法分别是发送和接收消息的方法,close方法表示关闭管道
【示例】如下:
from multiprocessing import Process, Pipe
class Consumer(Process):
def __init__(self, pipe):
Process.__init__(self)
self.pipe = pipe
def run(self):
self.pipe.send('哈雷彗星,这里是消费者')
print('我消费者', self.pipe.recv())
class Producer(Process):
def __init__(self, pipe):
Process.__init__(self)
self.pipe = pipe
def run(self):
print('我生产者', self.pipe.recv())
self.pipe.send('哈雷彗星,这里是生产者')
if __name__ == '__main__':
# pipe = Pipe(duplex=False)
pipe = Pipe()
p = Producer(pipe[1])
c = Consumer(pipe[0])
p.daemon=c.daemon=True
p.start()
c.start()
p.join()
c.join()
执行结果,就不放了,大家自己执行一下吧,,有问题留言。
管道 Pipe 就像进程之间搭建的桥梁,利用它我们就可以很方便地实现进程间通信了。
进程池
上述中,讲到我们可以使用Process来创建进程,同时也讲来如何使用Semaphore(信号量)来控制进程的并发执行数量。
如果我们有一个需求是:有10000个任务,每一个任务需要一个进程来执行,并且进程运行完毕之后要紧接这启动下一个进程,同时还需要控制进程的并发量,不能并发太高,不然CPU处理不过来。
这个如何实现呢?用Process和Semaphore可以实现,但是实现取来比较繁琐,而这种需求在平时又是非常常见的,此时,就可以使用进程池来是实现,即multiprocessing中的Pool。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool时,如果池没有满,就就创建一个新的进程用来执行该请求;如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行它。
【示例一】如下:
from multiprocessing import Pool
import time
def function(index):
print(f'Start process: {index}')
time.sleep(3)
print(f'End process {index}', )
if __name__ == '__main__':
pool = Pool(processes=3) # 设置进程为3
for i in range(4):
pool.apply_async(func=function, args=(i,))
print('Main Process started')
pool.close()
pool.join()
print('Main Process ended')
最后,要记得调用close方法来关闭进程池,使其不再接受新的任务。然后调用join方法让主进程等待子进程的退出,等子进程运行完毕后,主进程接着运行并结束。
其实进程池有一个更好用的map方法,可以将上述方法简化很多。
map方法是怎么使用的呢?第一个参数就是要启动的进程对应执行方法,第二个参数是一个可迭代对象,其中的每个元素会被传递给这个执行方法。
举一个例子:现在我们有一个list,里面包含了很多URL,另外也定义了一个方法用来抓取每个URL内容并解析,那么我们可以直接在map的第一个参数传入方法名,第2个参数传入URL数组。
【示例二】如下:
from multiprocessing import Pool
import requests
def scrape(url):
try:
req = requests.get(url)
print(f'URL:{url} Scraped')
except:
print(f"URL:{url} not Scraped")
if __name__ == '__main__':
pool = Pool(processes=3)
urls = [
'https://www.baidu.com',
'http://www.meituan.com/',
'http://blog.csdn.net/',
'http://xxxyxxx.net'
]
pool.map(scrape, urls)
pool.close()
首先要初始化一个Pool,指定进程数为3,然后声明一个urls列表,接着调用了map方法,第一参数就是进程对应的执行方法,第二个参数就是urls列表,map方法会依次将urls的每一个元素作为scrape的参数传递并启动一个新的进程,加到进程池中执行。