亲宝软件园·资讯

展开

Python使用future处理并发问题方案详解

lijiachang8 人气:0

从Python3.2引入的concurrent.futures模块,Python2.5以上需要在pypi中安装futures包。

future指一种对象,表示异步执行的操作。这个概念的作用很大,是concurrent.futures模块和asyncio包的基础。

网络下载的三种风格

为了高效的处理网络IO,需要使用并发,因为网络有很高的延迟,所以为了不浪费CPU周期去等待,最好再收到网络响应之前去做其他的事情。

下面有三种示例程序,

第一个程序是依序下载的,第二个是使用theadpool来自concurrent.futures模块,第三个是使用asyncio包

按照顺序下载

下面示例是依次下载

import os
import time
import sys
import requests
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'
def sava_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)
def get_flag(cc):
    url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content
def show(text):
    print(text, end=" ")
    sys.stdout.flush()  # 能在一行中显示
def download_many(cc_list):
    for cc in sorted(cc_list):
        image = get_flag(cc)
        show(cc)
        sava_flag(image, cc.lower() + ".gif")
    return len(cc_list)
def main():
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags download in {:.2f}s'
    print(msg.format(count, elapsed))
if __name__ == '__main__':
    main()

打印
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 flags download in 24.16s

知识点:

使用conrurrent.futures模块多线程下载

concurrent.futures模块的主要特色是TheadPoolExecutor和ProcessPoolExecutor类,这两个类实现的结构能分别在不同线程或者进程中执行可调用对象。

这两个类内部维护着一个工作线程池或者进程池,以及要执行的任务队列。不过,这个接口抽象的层级很高,无需关心任何实现细节。

下面展示如何使用TheadPoolExecutor.map方法,最简单的方式实现并发下载。

import os
import time
import sys
from concurrent import futures
import requests
POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'
max_workers = 20  # 设定线程数
def sava_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)
def get_flag(cc):
    url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content
def show(text):
    print(text, end=" ")
    sys.stdout.flush()  # 能在一行中显示
def download_one(cc):
    image = get_flag(cc)
    show(cc)
    sava_flag(image, cc.lower() + ".gif")
def download_many(cc_list):
    works = min(len(cc_list), max_workers)  # 取其中的最小值,以免创建多余的线程
    with futures.ThreadPoolExecutor(works) as executor:
        res = executor.map(download_one, cc_list)
    return len(list(res))
def main():
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags download in {:.2f}s'
    print(msg.format(count, elapsed))
if __name__ == '__main__':
    main()

打印
FR IN RU ID BD JP CN VN TR CD PH NG DE ET US EG IR BR MX PK 
20 flags download in 2.92s

知识点:

使用asyncio异步下载

后续章节会介绍

future是什么

从Python3.4开始,标准库中有两个名为Future的类:concurrent.futures.Future和asyncio.Future

这两个类的作用相同:两个future类的实例都可以表示已经完成或者尚未完成的延迟计算。这与Twister引擎中的Deferred类、Tornado框架中的Future类,以及多个JavaScript库中的Promise对象类似。

future封装待完成的操作,可以放入队列,完成的状态可以查询,得到结果(或抛出异常)。

通常情况下自己不应该创建future,而只能由并发框架concurrent.futures和asyncio实例化。

原因很简单:future表示终将发生的事情,而确定某件时间发生的唯一方式是执行的时间(顺序)已经排定。因此,只有排定把某件事情交给concurrent.futures.Executor子类处理时,才会创建concurrent.futures.Future实例。

Executor.submit()方法的参数是一个可调用的对象,调用这个方法后会为传入的可调用对象排期,并返回一个future。

客户端代码不应该改变future的状态,并发框架在future表示的延迟计算结束后会改变future的状态,而我们无法控制计算何时结束。

两种future都有.done()方法,这个方法并不阻塞,返回值是布尔值,指明future链接的可调用对象是否已经执行。

客户端代码通常不会询问future是否运行结束,而是会等待通知。因

两个Future类都有.add_done_callback()方法:这个方法只有一个参数,类型是可调用对象,future运行结束后会调用此可调用对象。

还有.result()方法,如果在future运行结束后调用的haunt,这个方法在两个Future类的作用相同:返回可调用对象的结果,或者抛出异常(重新抛出执行可调用对象时抛出的异常)。可是,如果future没有运行结束,result方法在两个Future类中的行为相差很大:

对于concurrent.futures.Future实例来说,调用了f.result()方法会阻塞调用方所在的线程,直到有结果返回,此时的result方法,可以接受可选的timeout参数,如果在指定实现内future没有运行完毕,会抛出TimeoutError异常。

对于asyncio.Future.result方法,不支持设置timeout超时时间,在那个库中获取future结果最好使用yield from结构。

这两个库中有几个函数会返回future,其他函数则使用future以用户易于理解的方式实现自身。

Executor.map方法是使用future:返回值是一个迭代器,迭代器的__next__方法调用各个future的result方法,因此得到各个future的结果。

concurrent.futures.as_completed函数参数是一个列表,返回值是一个迭代器,在future运行结束后产出future。

为了理解future,使用as_completed函数,把较为抽象的executor.map调换成两个for循环:一个用户创建并排定future(使用summit方法),一个用于获取future的结果。

示例,一窥神秘的future。

...其余代码省略
def download_one(cc):
    image = get_flag(cc)
    show(cc)
    sava_flag(image, cc.lower() + ".gif")
    return cc
def download_many(cc_list):
    cc_list = cc_list[:5]  # 这次演示5个国家
    future_list = []
    with futures.ThreadPoolExecutor(max_workers=3) as executor:  # 线程池为3,便于观察
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)  # submit方法排定可调用对象的执行时间,然后返回一个future,表示待执行操作
            future_list.append(future)
            print("{}: {}".format(cc, future))
        res = []
        for future in futures.as_completed(future_list):  # as_completed函数在future都执行完毕后,返回future
            result = future.result()  # 获取future结果
            print('{} result: {!r}'.format(future, result))
            res.append(result)

打印
BR: <Future at 0x3af2870 state=running>
CN: <Future at 0x3af2c90 state=running>
ID: <Future at 0x3af2ff0 state=running>
IN: <Future at 0x3aff3b0 state=pending>
US: <Future at 0x3aff410 state=pending>
CN <Future at 0x3af2c90 state=finished returned str> result: 'CN'
ID <Future at 0x3af2ff0 state=finished returned str> result: 'ID'
BR <Future at 0x3af2870 state=finished returned str> result: 'BR'
IN <Future at 0x3aff3b0 state=finished returned str> result: 'IN'
US <Future at 0x3aff410 state=finished returned str> result: 'US'

知识点:

GIL和阻塞型I/O

严格来说,上面实现的多线程并发脚本都不能实现并行下载。

使用concurrent.futures库实现的示例,都会受到GIL(Global Interpreter Lock,全局解释器锁)的限制,脚本只能在单个线程中执行。

CPython解释器本身就不是线程安全的,因此会有全局解释器锁GIL,一次只允许使用一个线程执行Python字节码。

因此一个Python进程通常不能同时使用多个CPU核心。(在Jython和IronPython中没有此限制,目前最快的PyPy解释器也存在GIL) IronPython是.net实现的

Python代码无法控制GIL,然而,标准库中所以执行阻塞型IO操作的函数,在等待操作系统返回结果时都会释放GIL。这意味着在Python语言这个层次上可以使用多线程,而IO密集型操作能从中受益:一个Python线程等待网络响应时,阻塞型IO函数会释放GIL,再运行一个线程。比如time.sleep()函数也会释放GIL。

GIL简化了CPython和C语言扩展的实现。得益于GIL,Python有很多C语言扩展。

使用concurrent.futures模块多进程

在处理CPU密集型操作时,可以使用多进程,实现真正的并行计算。

使用ProcessPoolExecutor类把任务分配给多个Python进程处理。因此如果需要做CPU密集型操作,使用这个模块多进程能绕开GIL,利用所有可用的CPU核心。

ProcessPoolExecutor和ThreadPoolExecutor类都实现了通用的Executor接口,因此使用concurrent.futures模块能轻松的把基于线程的方案转成基于进程的方案。

这两个实现Executor接口的类,唯一的区别是,ThreadPoolExecutor.__init__方法需要max_workers参数,指定线程池中线程的数量。

但是在ProcessPoolExecutor类中,这个参数是可选的,而且大多数情况下使用默认值:os.cpu_count()函数返回的CPU数量。因为对于CPU密集型操作来说,不可能要求使用超过CPU数量的进程。

经过测试,使用ProcessPoolExecutor实例下载20个国旗的时间,要比ThreadPoolExecutor要慢,主要原因是我电脑是四核八线程,八个逻辑处理器,因此限制只有4个并发下载,而使用线程池的版本有20个工作线程。

ProcessPoolExecutor的价值体现在CPU密集型操作上。比如对于加密算法上,使用ProcessPoolExecutor类派生出四个工作进程后,性能可以提高两倍。

如果使用PyPy比CPython相比,速度又能提高3.8倍。所以使用Python进行CPU密集型操作,应该试试PyPy,普遍快3.8~5.1倍。

实验Executor.map方法

若想并发运行多个可调用对象,最简单是是使用Executor.map方法。

示例,演示Executor.map方法的某些运作细节

import time
from concurrent import futures
def display(*args):
    """把参数打印前,加上时间显示"""
    print(time.strftime('[%H:%M:%S]'), end=" ")
    print(*args)
def loiter(n):
    """开始时显示一个消息,然后休眠n秒,最后再结束的时候在显示一个消息
        消息使用制表符缩进,缩进量由n值确定
        loiter:徘徊,闲着,闲荡
    """
    msg = '{}loiter({}):doing nothing for {}s'
    display(msg.format('\t' * n, n, n))
    time.sleep(n)
    msg = '{}loiter({}):done'
    display(msg.format('\t' * n, n))
    return n * 10  # 随意返回一个结果
def main():
    display('Script starting.')  # 脚本开始
    executor = futures.ThreadPoolExecutor(max_workers=3)
    results = executor.map(loiter, range(5))  # 把5个任务交个3个线程
    display('results :', results) # 打印调用executor.map的结果,是一个生成器
    display('waiting for individual results:')  # 等待个体结果
    for i, result in enumerate(results):
        display('result {}: {}'.format(i, result))
if __name__ == '__main__':
    main()

打印
[17:40:08] Script starting.
[17:40:08] loiter(0):doing nothing for 0s
[17:40:08] loiter(0):done
[17:40:08]     loiter(1):doing nothing for 1s
[17:40:08]         loiter(2):doing nothing for 2s
[17:40:08][17:40:08]  results :            loiter(3):doing nothing for 3s 
<generator object Executor.map.<locals>.result_iterator at 0x0318CDB0>
[17:40:08] waiting for individual results:
[17:40:08] result 0: 0
[17:40:09]     loiter(1):done
[17:40:09]                 loiter(4):doing nothing for 4s
[17:40:09] result 1: 10
[17:40:10]         loiter(2):done
[17:40:10] result 2: 20
[17:40:11]             loiter(3):done
[17:40:11] result 3: 30
[17:40:13]                 loiter(4):done
[17:40:13] result 4: 40

知识点:

综上,Executor.map函数易于使用,有个特征算是优点,但也可能没用变成缺点,具体情况取决于需求:map函数返回的结果顺序于调用开始的顺序一致。

如果第一个任务生成结果用时10秒,而其他任务调用只用1秒,代码就会阻塞10秒,获取map方法返回生成器的第一个结果。在此之后,获取后续结果时不会阻塞,因为后续的调用已经结束,所以,获取循环所有结果,阻塞的用时等于最长的任务时间。

如果必须等待获取所有结果后再处理的场景,这种行为没问题;不过,通常更常用的方式是,不管提交的顺序,只有有结果就获取。这样就要使用executor.submit方法和futures.as_completed函数结合起来使用。

executor.submit和futures.as_completed这个组合比executor.map更灵活:

显示下载进度条

Python内置库有tqdm包,taqadum在阿拉伯语中的意思是进展。

tqdm可以在长循环中添加一个进度提示信息,用户只需要封装任意的迭代器 tqdm(iterator)

import time
from tqdm import tqdm
for i in tqdm(range(1000)):
    time.sleep(.01)
100%|██████████| 1000/1000 [00:10<00:00, 95.34it/s]

tqdm能处理任何可迭代对象,生成一个迭代器;使用这个迭代器时,显示进度条和完成全部迭代

为了计算剩余时间,tqdm函数要获取可以使用len函数的可迭代对象,或者在第二个参数中指定预期的元素数量。

例如,futures.as_completed函数的结果,就不支持len函数,只能使用tdqm的第二个参数total=来指定数量。

网络下载增加错误处理和进度条

下面的示例中负责下载一个文件的函数(download_one)中使用相同的策略处理HTTP 404错误。其他异常则向上冒泡,交给download_many函数处理。

示例,负责下载的基本函数。

def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = requests.get(url)
    if resp.status_code != 200:
        resp.raise_for_status()  # 如果状态码不是200,产生一个HttpError的异常
    return resp.content
def download_one(cc, base_url, verbose=False):
    try:
        image = get_flag(base_url, cc)
    except requests.exceptions.HTTPError as exc:
        res = exc.response
        if res.status_code == 404:
            status = HTTPStatus.not_found
            msg = 'not found'
        else:  # 如果不是404异常,向上冒泡,传给调用方
            raise
    else:
        sava_flag(image, cc.lower() + '.gif')
        status = HTTPStatus.ok
        msg = 'ok'
    if verbose:
        print(cc, msg)
    return Result(status, cc)

示例,依序下载的download_many函数

def download_many(cc_list, base_url, verbose, max_req):
    """实现依序下载"""
    counter = collections.Counter()  # 统计不同的下载状态:HTTPStatus.ok、HTTPStatus.not_found、HTTPStatus.error
    cc_iter = sorted(cc_list)
    if not verbose:
        cc_iter = tqdm.tqdm(cc_iter)  # 如果不需要详细模式,就使用进度条展示
    for cc in cc_iter:
        try:
            res = download_one(cc, base_url, verbose)
        except requests.exceptions.HTTPError as exc:
            error_msg = "HTTP error {res.status_code} - {res.reason}"
            error_msg = error_msg.format(res=exc.response)
        except requests.exceptions.ConnectionError as exc:
            error_msg = 'Connection error'
        else:
            error_msg = ''
            status = res.status
        if error_msg:
            status = HTTPStatus.error
        counter[status] += 1
        if verbose and error_msg:
            print('*** Error for {}:{}'.format(cc, error_msg))
    return counter

知识点:

requests.exceptions中有所有的requests相关的异常类,可以用来捕获相关异常。

如果有响应信息后,产生的异常,异常对象exc.response的status_code状态码和reason异常原因

示例,多线程下载的download_many函数

default_concur_req = 30  # 默认的线程池大小
max_concur_req = 1000  # 最大并发请求数,这是一个安全措施
def download_many(cc_list, base_url, verbose, concur_req):
    counter = collections.Counter()
    with futures.ThreadPoolExecutor(max_workers=concur_req) as executor:
        to_do_map = {}
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc, base_url, verbose)  # submit排定一个可调用对象的执行时间,返回一个Future实例
            to_do_map[future] = cc  # 把各个Future实例映射到国家代码上,在错误处理时使用
        done_iter = futures.as_completed(to_do_map) # 返回一个迭代器,在future运行结束后产出future
        if not verbose:
            done_iter = tqdm.tqdm(done_iter, total=len(cc_list)) # 如果不是详细模式,就显示进度条,因为done_iter没有len函数,只能通过total参数传入
        for future in done_iter:
            try:
                res = future.result()
            except requests.exceptions.HTTPError as exc:
                error_msg = 'HTTP {res.status_code} - {res.reason}'
                error_msg = error_msg.format(res=exc.response)
            except requests.exceptions.ConnectionError as exc:
                error_msg = "Connection error"
            else:
                error_msg = ""
                status = res.status
            if error_msg:
                status = HTTPStatus.error
            counter[status] +=1
            if verbose and error_msg:
                cc = to_do_map[future]
                print('*** Error for {}:{}'.format(cc, error_msg))
        return counter

知识点:

对futures.as_completed函数的惯用法:构建一个字典,把各个future映射到其他数据上,future运行结束后可能会有用。比如上述示例,把future映射到国家代码上。

线程和多进程的代替方案

对于多线程,如果futures.ThreadPoolExecutor类对某个作业来首不够灵活,可能要使用到threading模块中的组件(如Thread、Lock、Semaphore等)自行制定方案,

比如使用queue模块创建线程安全的队列,在线程之间传递数据。futures.ThreadPoolExecutor类已经封装好了这些组件。

对于CPU密集型工作来说,要启动多个进程,规避GIL。创建多个进程的最简单方式是用futures.ProcessPoolExecutor类。如果使用场景较复杂,需要更高级的工具,multiprocessing模块的API和threading模块相仿,不过作业交给多个进程处理。

加载全部内容

相关教程
猜你喜欢
用户评论