运用多线程(threading)和多历程(multiprocessing)完成通例的并发需求,在启动的时刻 start、join 等步骤不能省,庞杂的须要还要用 1-2 个行列。
跟着需求愈来愈庞杂,假如没有优越的设想和笼统这部份的功用条理,代码量越多调试的难度就越大。
关于须要并发实行、然则对及时性要求不高的使命,我们能够运用 concurrent.futures 包中的 PoolExecutor 类来完成。
这个包供应了两个实行器:线程池实行器 ThreadPoolExecutor 和历程池实行器 ProcessPoolExecutor,两个实行器供应一样的 API。
池的观点重要目标是为了重用:让线程或历程在生命周期内能够屡次运用。它减少了建立建立线程和历程的开支,提高了顺序机能。重用不是必需的划定规矩,但它是顺序员在运用中运用池的重要原因。
池,只要牢固个数的线程/历程,经由过程 max_workers 指定。
使命经由过程 executor.submit 提交到 executor 的使命行列,返回一个 future 对象。
Future 是罕见的一种并发设想形式。
一个Future对象代表了一些还没有停当(完成)的效果,在「未来」的某个时候停当了以后就能够猎取到这个效果。
使命被调理到各个 workers 中实行。
然则要注意,一个使命一旦被实行,在实行终了前,会一向占用该 worker!假如 workers 不够用,其他的使命会一向守候!因而 PoolExecutor 不适合及时使命。
import concurrent.futures import time from itertools import count number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] def evaluate_item(x): for i in count(x): # count 是无穷迭代器,会一向递增。 print(f"{x} - {i}") time.sleep(0.01) if __name__ == "__main__": # 历程池 start_time_2 = time.time() # 运用 with 在脱离此代码块时,自动挪用 executor.shutdown(wait=true) 开释 executor 资本 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # 将 10 个使命提交给 executor,并网络 futures futures = [executor.submit(evaluate_item, item) for item in number_list] # as_completed 要领守候 futures 中的 future 完成 # 一旦某个 future 完成,as_completed 就马上返回该 future # 这个要领,使每次返回的 future,老是最早完成的 future # 而不是先守候使命 1,再守候使命 2... for future in concurrent.futures.as_completed(futures): print(future.result()) print ("Thread pool execution in " + str(time.time() - start_time_2), "seconds")
上面的代码中,item 为 1 2 3 4 5 的五个使命会一向占用一切的 workers,而 6 7 8 9 10 这五个使命会永久守候!!!
API 细致申明
concurrent.futures 包括三个部份的 API:
PoolExecutor:也就是两个实行器的 API
组织器:重要的参数是 max_workers,用于指定线程池大小(或许说 workers 个数)
submit(fn, *args, **kwargs):将使命函数 fn 提交到实行器,args 和 kwargs 就是 fn 须要的参数。
返回一个 future,用于猎取效果
map(func, *iterables, timeout=None, chunksize=1):当使命是同一个,只要参数不同时,能够用这个要领替代 submit。iterables 的每一个元素对应 func 的一组参数。
返回一个 futures 的迭代器
shutdown(wait=True):封闭实行器,平常都运用 with 管理器自动封闭。
Future:使命被提交给实行器后,会返回一个 future
future.result(timout=None):最经常使用的要领,返回使命的效果。假如使命还没有完毕,这个要领会一向守候!
timeout 指定超时时候,为 None 时没有超时限定。
exception(timeout=None):给出使命抛出的非常。和 result() 一样,也会守候使命完毕。
cancel():作废此使命
add_done_callback(fn):future 完成后,会实行 fn(future)。
running():是不是正在运转
done():future 是不是已完毕了,boolean
...详见官方文档
模块带有的有用函数
concurrent.futures.as_completed(fs, timeout=None):守候 fs (futures iterable)中的 future 完成
一旦 fs 中的某 future 完成了,这个函数就马上返回该 future。
这个要领,使每次返回的 future,老是最早完成的 future。而不是先守候使命 1,再守候使命 2...
常经由过程 for future in as_completed(fs): 运用此函数。
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED):一向守候,直到 return_when 所指定的事发作,或许 timeout
return_when 有三个选项:ALL_COMPLETED(fs 中的 futures 悉数完成),FIRST__COMPLETED(fs 中恣意一个 future 完成)另有 FIRST_EXCEPTION(某使命抛出非常)
Future 设想形式
这里的 PoolExecutor 的特性,在于它运用了 Future 设想形式,使使命的实行,与效果的猎取,变成一个异步的流程。
我们先经由过程 submit/map 将使命放入使命行列,这时候使命就已最先实行了!然后我们在须要的时刻,经由过程 future 猎取效果,或许直接 add_done_callback(fn)。
这里使命的实行是在新的 workers 中的,主历程/线程不会壅塞,因而主线程能够干其他的事。这类体式格局被称作异步编程。
画外
concurrent.futures 基于 multiprocessing.pool 完成,因而实际上它比直接运用 线程/历程 的 Pool 要慢一点。然则它供应了更轻易简约的 API。
以上就是Python并发之PoolExecutor的引见(附示例)的细致内容,更多请关注ki4网别的相干文章!