Python使用future处理并发问题方案详解
lijiachang8 人气:0从Python3.2引入的concurrent.futures模块,Python2.5以上需要在pypi中安装futures包。
future指一种对象,表示异步执行的操作。这个概念的作用很大,是concurrent.futures模块和asyncio包的基础。
网络下载的三种风格
为了高效的处理网络IO,需要使用并发,因为网络有很高的延迟,所以为了不浪费CPU周期去等待,最好再收到网络响应之前去做其他的事情。
下面有三种示例程序,
第一个程序是依序下载的,第二个是使用theadpool来自concurrent.futures模块,第三个是使用asyncio包
按照顺序下载
下面示例是依次下载
import os import time import sys import requests POP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split() BASE_URL = 'http://flupy.org/data/flags' DEST_DIR = 'downloads/' def sava_flag(img, filename): path = os.path.join(DEST_DIR, filename) with open(path, 'wb') as fp: fp.write(img) def get_flag(cc): url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) resp = requests.get(url) return resp.content def show(text): print(text, end=" ") sys.stdout.flush() # 能在一行中显示 def download_many(cc_list): for cc in sorted(cc_list): image = get_flag(cc) show(cc) sava_flag(image, cc.lower() + ".gif") return len(cc_list) def main(): t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = '\n{} flags download in {:.2f}s' print(msg.format(count, elapsed)) if __name__ == '__main__': main()
打印
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
20 flags download in 24.16s
知识点:
- 按照惯例,requests不在标准库中,在导入标准库之后,用一个空行分隔开
- sys.stdout.flush() : 显示一个字符串,然后刷新sys.stdout,这样能在一行消息中看到进度。Python中正常情况下,遇到换行才会刷新stdout缓冲。
使用conrurrent.futures模块多线程下载
concurrent.futures模块的主要特色是TheadPoolExecutor和ProcessPoolExecutor类,这两个类实现的结构能分别在不同线程或者进程中执行可调用对象。
这两个类内部维护着一个工作线程池或者进程池,以及要执行的任务队列。不过,这个接口抽象的层级很高,无需关心任何实现细节。
下面展示如何使用TheadPoolExecutor.map方法,最简单的方式实现并发下载。
import os import time import sys from concurrent import futures import requests POP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split() BASE_URL = 'http://flupy.org/data/flags' DEST_DIR = 'downloads/' max_workers = 20 # 设定线程数 def sava_flag(img, filename): path = os.path.join(DEST_DIR, filename) with open(path, 'wb') as fp: fp.write(img) def get_flag(cc): url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) resp = requests.get(url) return resp.content def show(text): print(text, end=" ") sys.stdout.flush() # 能在一行中显示 def download_one(cc): image = get_flag(cc) show(cc) sava_flag(image, cc.lower() + ".gif") def download_many(cc_list): works = min(len(cc_list), max_workers) # 取其中的最小值,以免创建多余的线程 with futures.ThreadPoolExecutor(works) as executor: res = executor.map(download_one, cc_list) return len(list(res)) def main(): t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = '\n{} flags download in {:.2f}s' print(msg.format(count, elapsed)) if __name__ == '__main__': main()
打印
FR IN RU ID BD JP CN VN TR CD PH NG DE ET US EG IR BR MX PK
20 flags download in 2.92s
知识点:
- 使用线程数实例化ThreadPoolExecute类。executor.__exit__方法会调用executor.shutdown(wait=True)方法,它会在所有线程执行完毕前阻塞线程。
- executor.map方法的作用于内置map函数类似,区别是在多个线程中并发调用,此map方法会返回一个生成器,因此可以迭代,获取各个函数返回的值。
- return len(list(res))这里要注意下,读取res也就是executor.map各个函数的返回值,如果线程中有异常,会在这里抛出,这与隐式调用next()函数从迭代器中获取相应的返回值一样。
使用asyncio异步下载
后续章节会介绍
future是什么
从Python3.4开始,标准库中有两个名为Future的类:concurrent.futures.Future和asyncio.Future
这两个类的作用相同:两个future类的实例都可以表示已经完成或者尚未完成的延迟计算。这与Twister引擎中的Deferred类、Tornado框架中的Future类,以及多个JavaScript库中的Promise对象类似。
future封装待完成的操作,可以放入队列,完成的状态可以查询,得到结果(或抛出异常)。
通常情况下自己不应该创建future,而只能由并发框架concurrent.futures和asyncio实例化。
原因很简单:future表示终将发生的事情,而确定某件时间发生的唯一方式是执行的时间(顺序)已经排定。因此,只有排定把某件事情交给concurrent.futures.Executor子类处理时,才会创建concurrent.futures.Future实例。
Executor.submit()方法的参数是一个可调用的对象,调用这个方法后会为传入的可调用对象排期,并返回一个future。
客户端代码不应该改变future的状态,并发框架在future表示的延迟计算结束后会改变future的状态,而我们无法控制计算何时结束。
两种future都有.done()方法,这个方法并不阻塞,返回值是布尔值,指明future链接的可调用对象是否已经执行。
客户端代码通常不会询问future是否运行结束,而是会等待通知。因
两个Future类都有.add_done_callback()方法:这个方法只有一个参数,类型是可调用对象,future运行结束后会调用此可调用对象。
还有.result()方法,如果在future运行结束后调用的haunt,这个方法在两个Future类的作用相同:返回可调用对象的结果,或者抛出异常(重新抛出执行可调用对象时抛出的异常)。可是,如果future没有运行结束,result方法在两个Future类中的行为相差很大:
对于concurrent.futures.Future实例来说,调用了f.result()方法会阻塞调用方所在的线程,直到有结果返回,此时的result方法,可以接受可选的timeout参数,如果在指定实现内future没有运行完毕,会抛出TimeoutError异常。
对于asyncio.Future.result方法,不支持设置timeout超时时间,在那个库中获取future结果最好使用yield from结构。
这两个库中有几个函数会返回future,其他函数则使用future以用户易于理解的方式实现自身。
Executor.map方法是使用future:返回值是一个迭代器,迭代器的__next__方法调用各个future的result方法,因此得到各个future的结果。
concurrent.futures.as_completed函数参数是一个列表,返回值是一个迭代器,在future运行结束后产出future。
为了理解future,使用as_completed函数,把较为抽象的executor.map调换成两个for循环:一个用户创建并排定future(使用summit方法),一个用于获取future的结果。
示例,一窥神秘的future。
...其余代码省略 def download_one(cc): image = get_flag(cc) show(cc) sava_flag(image, cc.lower() + ".gif") return cc def download_many(cc_list): cc_list = cc_list[:5] # 这次演示5个国家 future_list = [] with futures.ThreadPoolExecutor(max_workers=3) as executor: # 线程池为3,便于观察 for cc in sorted(cc_list): future = executor.submit(download_one, cc) # submit方法排定可调用对象的执行时间,然后返回一个future,表示待执行操作 future_list.append(future) print("{}: {}".format(cc, future)) res = [] for future in futures.as_completed(future_list): # as_completed函数在future都执行完毕后,返回future result = future.result() # 获取future结果 print('{} result: {!r}'.format(future, result)) res.append(result)
打印
BR: <Future at 0x3af2870 state=running>
CN: <Future at 0x3af2c90 state=running>
ID: <Future at 0x3af2ff0 state=running>
IN: <Future at 0x3aff3b0 state=pending>
US: <Future at 0x3aff410 state=pending>
CN <Future at 0x3af2c90 state=finished returned str> result: 'CN'
ID <Future at 0x3af2ff0 state=finished returned str> result: 'ID'
BR <Future at 0x3af2870 state=finished returned str> result: 'BR'
IN <Future at 0x3aff3b0 state=finished returned str> result: 'IN'
US <Future at 0x3aff410 state=finished returned str> result: 'US'
知识点:
- summit方法把一个可执行对象(函数),变为一个future对象,并且记录了执行时间,表示待执行打操作。
- futures.as_completed在所有的future执行完毕后,产出future对象。而后可以使用future.result()获取结果
- 直接打印future对象(调用future的repr()方法),会显示future的状态,例如running、pending(等待)、finished
GIL和阻塞型I/O
严格来说,上面实现的多线程并发脚本都不能实现并行下载。
使用concurrent.futures库实现的示例,都会受到GIL(Global Interpreter Lock,全局解释器锁)的限制,脚本只能在单个线程中执行。
CPython解释器本身就不是线程安全的,因此会有全局解释器锁GIL,一次只允许使用一个线程执行Python字节码。
因此一个Python进程通常不能同时使用多个CPU核心。(在Jython和IronPython中没有此限制,目前最快的PyPy解释器也存在GIL) IronPython是.net实现的
Python代码无法控制GIL,然而,标准库中所以执行阻塞型IO操作的函数,在等待操作系统返回结果时都会释放GIL。这意味着在Python语言这个层次上可以使用多线程,而IO密集型操作能从中受益:一个Python线程等待网络响应时,阻塞型IO函数会释放GIL,再运行一个线程。比如time.sleep()函数也会释放GIL。
GIL简化了CPython和C语言扩展的实现。得益于GIL,Python有很多C语言扩展。
使用concurrent.futures模块多进程
在处理CPU密集型操作时,可以使用多进程,实现真正的并行计算。
使用ProcessPoolExecutor类把任务分配给多个Python进程处理。因此如果需要做CPU密集型操作,使用这个模块多进程能绕开GIL,利用所有可用的CPU核心。
ProcessPoolExecutor和ThreadPoolExecutor类都实现了通用的Executor接口,因此使用concurrent.futures模块能轻松的把基于线程的方案转成基于进程的方案。
这两个实现Executor接口的类,唯一的区别是,ThreadPoolExecutor.__init__方法需要max_workers参数,指定线程池中线程的数量。
但是在ProcessPoolExecutor类中,这个参数是可选的,而且大多数情况下使用默认值:os.cpu_count()函数返回的CPU数量。因为对于CPU密集型操作来说,不可能要求使用超过CPU数量的进程。
经过测试,使用ProcessPoolExecutor实例下载20个国旗的时间,要比ThreadPoolExecutor要慢,主要原因是我电脑是四核八线程,八个逻辑处理器,因此限制只有4个并发下载,而使用线程池的版本有20个工作线程。
ProcessPoolExecutor的价值体现在CPU密集型操作上。比如对于加密算法上,使用ProcessPoolExecutor类派生出四个工作进程后,性能可以提高两倍。
如果使用PyPy比CPython相比,速度又能提高3.8倍。所以使用Python进行CPU密集型操作,应该试试PyPy,普遍快3.8~5.1倍。
实验Executor.map方法
若想并发运行多个可调用对象,最简单是是使用Executor.map方法。
示例,演示Executor.map方法的某些运作细节
import time from concurrent import futures def display(*args): """把参数打印前,加上时间显示""" print(time.strftime('[%H:%M:%S]'), end=" ") print(*args) def loiter(n): """开始时显示一个消息,然后休眠n秒,最后再结束的时候在显示一个消息 消息使用制表符缩进,缩进量由n值确定 loiter:徘徊,闲着,闲荡 """ msg = '{}loiter({}):doing nothing for {}s' display(msg.format('\t' * n, n, n)) time.sleep(n) msg = '{}loiter({}):done' display(msg.format('\t' * n, n)) return n * 10 # 随意返回一个结果 def main(): display('Script starting.') # 脚本开始 executor = futures.ThreadPoolExecutor(max_workers=3) results = executor.map(loiter, range(5)) # 把5个任务交个3个线程 display('results :', results) # 打印调用executor.map的结果,是一个生成器 display('waiting for individual results:') # 等待个体结果 for i, result in enumerate(results): display('result {}: {}'.format(i, result)) if __name__ == '__main__': main()
打印
[17:40:08] Script starting.
[17:40:08] loiter(0):doing nothing for 0s
[17:40:08] loiter(0):done
[17:40:08] loiter(1):doing nothing for 1s
[17:40:08] loiter(2):doing nothing for 2s
[17:40:08][17:40:08] results : loiter(3):doing nothing for 3s
<generator object Executor.map.<locals>.result_iterator at 0x0318CDB0>
[17:40:08] waiting for individual results:
[17:40:08] result 0: 0
[17:40:09] loiter(1):done
[17:40:09] loiter(4):doing nothing for 4s
[17:40:09] result 1: 10
[17:40:10] loiter(2):done
[17:40:10] result 2: 20
[17:40:11] loiter(3):done
[17:40:11] result 3: 30
[17:40:13] loiter(4):done
[17:40:13] result 4: 40
知识点:
- 示例中把5个任务交给executor(3个线程),其中前三个任务会立即开始;这是非阻塞调用。
- 在for循环中会隐式调用next(results),这个函数又会在第一个任务的future上调用future.result()方法。result方法会阻塞,直到这个future运行结束。所以这个for循环每次迭代都会阻塞,等到结果出来后,才会继续。
- 每次的打印结果都可能不一样。由于sleep函数总会释放GIL,即使是sleep(0),所以loiter(1)有可能在loiter(0)结束之前开始运行,但是这个示例中没有。三个线程是同时开始。
- executor.map的结果是一个生成器,这个操作不会阻塞。
- loiter(0)的结果result 0: 0打印没有阻塞的原因是,在for循环之前future已经执行完成,可以看到输出了done。
综上,Executor.map函数易于使用,有个特征算是优点,但也可能没用变成缺点,具体情况取决于需求:map函数返回的结果顺序于调用开始的顺序一致。
如果第一个任务生成结果用时10秒,而其他任务调用只用1秒,代码就会阻塞10秒,获取map方法返回生成器的第一个结果。在此之后,获取后续结果时不会阻塞,因为后续的调用已经结束,所以,获取循环所有结果,阻塞的用时等于最长的任务时间。
如果必须等待获取所有结果后再处理的场景,这种行为没问题;不过,通常更常用的方式是,不管提交的顺序,只有有结果就获取。这样就要使用executor.submit方法和futures.as_completed函数结合起来使用。
executor.submit和futures.as_completed这个组合比executor.map更灵活:
- 因为submit方法能处理不同的可调用对象和参数,而executor.map只能处理参数不同的同一个可调用对象
- 此外,传给future.as_completed函数的future集合可以来自多个Executor实例
- futures.as_completed只返回已经运行结束的future
显示下载进度条
Python内置库有tqdm包,taqadum在阿拉伯语中的意思是进展。
tqdm可以在长循环中添加一个进度提示信息,用户只需要封装任意的迭代器 tqdm(iterator)
import time from tqdm import tqdm for i in tqdm(range(1000)): time.sleep(.01) 100%|██████████| 1000/1000 [00:10<00:00, 95.34it/s]
tqdm能处理任何可迭代对象,生成一个迭代器;使用这个迭代器时,显示进度条和完成全部迭代
为了计算剩余时间,tqdm函数要获取可以使用len函数的可迭代对象,或者在第二个参数中指定预期的元素数量。
例如,futures.as_completed函数的结果,就不支持len函数,只能使用tdqm的第二个参数total=来指定数量。
网络下载增加错误处理和进度条
下面的示例中负责下载一个文件的函数(download_one)中使用相同的策略处理HTTP 404错误。其他异常则向上冒泡,交给download_many函数处理。
示例,负责下载的基本函数。
def get_flag(base_url, cc): url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower()) resp = requests.get(url) if resp.status_code != 200: resp.raise_for_status() # 如果状态码不是200,产生一个HttpError的异常 return resp.content def download_one(cc, base_url, verbose=False): try: image = get_flag(base_url, cc) except requests.exceptions.HTTPError as exc: res = exc.response if res.status_code == 404: status = HTTPStatus.not_found msg = 'not found' else: # 如果不是404异常,向上冒泡,传给调用方 raise else: sava_flag(image, cc.lower() + '.gif') status = HTTPStatus.ok msg = 'ok' if verbose: print(cc, msg) return Result(status, cc)
示例,依序下载的download_many函数
def download_many(cc_list, base_url, verbose, max_req): """实现依序下载""" counter = collections.Counter() # 统计不同的下载状态:HTTPStatus.ok、HTTPStatus.not_found、HTTPStatus.error cc_iter = sorted(cc_list) if not verbose: cc_iter = tqdm.tqdm(cc_iter) # 如果不需要详细模式,就使用进度条展示 for cc in cc_iter: try: res = download_one(cc, base_url, verbose) except requests.exceptions.HTTPError as exc: error_msg = "HTTP error {res.status_code} - {res.reason}" error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = 'Connection error' else: error_msg = '' status = res.status if error_msg: status = HTTPStatus.error counter[status] += 1 if verbose and error_msg: print('*** Error for {}:{}'.format(cc, error_msg)) return counter
知识点:
requests.exceptions中有所有的requests相关的异常类,可以用来捕获相关异常。
如果有响应信息后,产生的异常,异常对象exc.response的status_code状态码和reason异常原因
示例,多线程下载的download_many函数
default_concur_req = 30 # 默认的线程池大小 max_concur_req = 1000 # 最大并发请求数,这是一个安全措施 def download_many(cc_list, base_url, verbose, concur_req): counter = collections.Counter() with futures.ThreadPoolExecutor(max_workers=concur_req) as executor: to_do_map = {} for cc in sorted(cc_list): future = executor.submit(download_one, cc, base_url, verbose) # submit排定一个可调用对象的执行时间,返回一个Future实例 to_do_map[future] = cc # 把各个Future实例映射到国家代码上,在错误处理时使用 done_iter = futures.as_completed(to_do_map) # 返回一个迭代器,在future运行结束后产出future if not verbose: done_iter = tqdm.tqdm(done_iter, total=len(cc_list)) # 如果不是详细模式,就显示进度条,因为done_iter没有len函数,只能通过total参数传入 for future in done_iter: try: res = future.result() except requests.exceptions.HTTPError as exc: error_msg = 'HTTP {res.status_code} - {res.reason}' error_msg = error_msg.format(res=exc.response) except requests.exceptions.ConnectionError as exc: error_msg = "Connection error" else: error_msg = "" status = res.status if error_msg: status = HTTPStatus.error counter[status] +=1 if verbose and error_msg: cc = to_do_map[future] print('*** Error for {}:{}'.format(cc, error_msg)) return counter
知识点:
对futures.as_completed函数的惯用法:构建一个字典,把各个future映射到其他数据上,future运行结束后可能会有用。比如上述示例,把future映射到国家代码上。
线程和多进程的代替方案
对于多线程,如果futures.ThreadPoolExecutor类对某个作业来首不够灵活,可能要使用到threading模块中的组件(如Thread、Lock、Semaphore等)自行制定方案,
比如使用queue模块创建线程安全的队列,在线程之间传递数据。futures.ThreadPoolExecutor类已经封装好了这些组件。
对于CPU密集型工作来说,要启动多个进程,规避GIL。创建多个进程的最简单方式是用futures.ProcessPoolExecutor类。如果使用场景较复杂,需要更高级的工具,multiprocessing模块的API和threading模块相仿,不过作业交给多个进程处理。
加载全部内容