Python asyncio 异步函数调用示例

异步函数调用示例

  • 运行时间装饰器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    import time
    import functools


    def run_benchmark(func):
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
    start_time = time.perf_counter()
    res = await func(*args, **kwargs)
    print(f'{func.__name__} run {args} {time.perf_counter() - start_time}')
    return res

    return wrapper


  • 首先 普通函数同步代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    import time

    def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    time.sleep(sleep_time)
    print('OK {}'.format(url))

    def main(urls):
    for url in urls:
    crawl_page(url)

    main(['url_1', 'url_2', 'url_3', 'url_4'])
  • 异步函数同步代码

    async 修饰词声明异步函数,于是,这里的 crawl_pagemain 都变成了异步函数。而调用异步函数,我们便可得到一个协程对象(coroutine object

    asyncio.run(main()) 作为主程序的入口函数,在程序运行周期内,只调用一次 asyncio.run

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import asyncio


    async def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    print(sleep_time)
    await asyncio.sleep(sleep_time)
    print('OK {}'.format(url))


    async def main(urls):
    for url in urls:
    await crawl_page(url)


    asyncio.run(main(['url_1', 'url_10', 'url_3', 'url_4']))

    await 是同步调用,因此, crawl_page(url) 在当前的调用结束之前,是不会触发下一次调用的。于是,这个代码效果就和上面完全一样了,相当于我们用异步接口写了个同步代码。

  • 异步函数代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    import asyncio

    from time_dec import run_benchmark


    @run_benchmark
    async def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print('OK {}'.format(url))


    @run_benchmark
    async def main(urls):
    tasks = [asyncio.create_task(crawl_page(url)) for url in urls]

    for task in tasks:
    await task


    asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))

    协程任务概念 任务(Task)有了协程对象后,便可以通过 asyncio.create_task 来创建任务。任务创建后很快就会被调度执行,这样,我们的代码也不会阻塞在任务这里。所以,我们要等所有任务都结束才行,用for task in tasks: await task 即可。

    结果显示,运行总时长等于运行时间最长的爬虫。

  • 异步函数 task

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    import asyncio
    from time_dec import run_benchmark


    @run_benchmark
    async def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print('OK {}'.format(url))


    @run_benchmark
    async def main(urls):
    tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
    await asyncio.gather(*tasks)


    asyncio.run(main(['url_1', 'url_10', 'url_3', 'url_4']))

    *tasks 解包列表,将列表变成了函数的参数;与之对应的是, ** dict 将字典变成了函数的参数。

  • 异步代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28

    import asyncio


    async def worker_1():
    print('worker_1 start')
    await asyncio.sleep(1)
    print('worker_1 done')


    async def worker_2():
    print('worker_2 start')
    await asyncio.sleep(2)
    print('worker_2 done')



    async def main():
    task1 = asyncio.create_task(worker_1())
    task2 = asyncio.create_task(worker_2())
    print('before await')
    await task1
    print('awaited worker_1')
    await task2
    print('awaited worker_2')


    asyncio.run(main())
  • 限定时间执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    import asyncio


    async def worker_1():
    await asyncio.sleep(1)
    return 1


    async def worker_2():
    await asyncio.sleep(2)
    return 2 / 0


    async def worker_3():
    await asyncio.sleep(3)
    return 3


    async def main():
    task_1 = asyncio.create_task(worker_1())
    task_2 = asyncio.create_task(worker_2())
    task_3 = asyncio.create_task(worker_3())

    await asyncio.sleep(2)
    task_3.cancel()

    res = await asyncio.gather(task_1, task_2, task_3, return_exceptions=True)
    print(res)


    asyncio.run(main())

    worker_1 正常运行,worker_2 运行中出现错误,worker_3 执行时间过长被我们 cancel 掉了,这些信息会全部体现在最终的返回结果 res 中。

    注意return_exceptions=True这行代码。如果不设置这个参数,错误就会完整地 throw 到我们这个执行层,从而需要 try except 来捕捉,这也就意味着其他还没被执行的任务会被全部取消掉。为了避免这个局面,我们将 return_exceptions 设置为 True 即可

  • 使用实例爬虫

    同步函数 爬虫流程为先获取电影链接、名称、日期,然后再一次发起连接获取图片

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    import requests
    from bs4 import BeautifulSoup


    def main():
    url = "https://movie.douban.com/cinema/later/beijing/"
    headers_ = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.81 Safari/537.36"}
    init_page = requests.get(url, headers=headers_).content
    init_soup = BeautifulSoup(init_page, 'lxml')

    all_movies = init_soup.find('div', id="showing-soon")
    for each_movie in all_movies.find_all('div', class_="item"):
    all_a_tag = each_movie.find_all('a')
    all_li_tag = each_movie.find_all('li')

    movie_name = all_a_tag[1].text
    url_to_fetch = all_a_tag[1]['href']
    movie_date = all_li_tag[0].text

    response_item = requests.get(url_to_fetch, headers=headers_).content
    soup_item = BeautifulSoup(response_item, 'lxml')
    img_tag = soup_item.find('img')

    print('{} {} {}'.format(movie_name, movie_date, img_tag['src']))


    main()

    异步函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    import asyncio
    import aiohttp

    from bs4 import BeautifulSoup

    headers_ = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.81 Safari/537.36"}


    async def fetch_content(url):
    async with aiohttp.ClientSession(
    headers=headers_, connector=aiohttp.TCPConnector(ssl=False)
    ) as session:
    async with session.get(url) as response:
    return await response.text()


    async def main():
    url = "https://movie.douban.com/cinema/later/beijing/"
    init_page = await fetch_content(url)

    init_soup = BeautifulSoup(init_page, 'lxml')
    movie_names, urls_to_fetch, movie_dates = [], [], []

    all_movies = init_soup.find('div', id="showing-soon")
    count = 0
    for each_movie in all_movies.find_all('div', class_="item"):
    all_a_tag = each_movie.find_all('a')
    all_li_tag = each_movie.find_all('li')

    movie_names.append(all_a_tag[1].text)
    urls_to_fetch.append(all_a_tag[1]['href'])
    movie_dates.append(all_li_tag[0].text)
    count += 1
    if count == 2:
    break

    # tasks = [fetch_content(url) for url in urls_to_fetch]
    tasks = [asyncio.create_task(fetch_content(url)) for url in urls_to_fetch]
    pages = await asyncio.gather(*tasks)

    for movie_name, movie_date, page in zip(movie_names, movie_dates, pages):
    soup_item = BeautifulSoup(page, 'lxml')
    img_tag = soup_item.find('img')

    print('{} {} {}'.format(movie_name, movie_date, img_tag['src']))


    asyncio.run(main())

    • 协程和多线程的区别,主要在于两点,一是协程为单线程;二是协程由用户决定,在哪些地方交出控制权,切换到下一个任务。
    • 协程的写法更加简洁清晰,把 async / await 语法和 create_task 结合来用,对于中小级别的并发需求已经毫无压力。
    • 写协程程序的时候,你的脑海中要有清晰的事件循环概念,知道程序在什么时候需要暂停、等待 I/O,什么时候需要一并执行到底。
  • 事件循环异步函数

    Python 3.7+ asyncio.run(main())

    如果您运行的是 Python 3.7 版本,则需要替换asyncio.run(main())为以下内容:

    1
    2
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    否则,您将收到一条错误消息: AttributeError: module 'asyncio' has no attribute 'run'

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    import os
    import random
    import asyncio
    import aiohttp
    from aiofile import async_open
    from tqdm import tqdm

    headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36'
    }
    FILE_STORAGE = 'file_storage' # 下载保存文件的文件夹名
    if not os.path.exists(FILE_STORAGE):
    os.makedirs(FILE_STORAGE)


    async def download_file(url: str):
    semaphore = asyncio.Semaphore(2) # 并发数量
    async with semaphore:
    conn = aiohttp.TCPConnector(limit=2)
    async with aiohttp.ClientSession(headers=headers, connector=conn) as session:
    async with session.get(url) as response:
    # tqdm 做个终端回显 没啥用
    if response.headers.get('Content-Length'):
    pbar = tqdm(total=int(response.headers['Content-Length']), desc=url)

    # 测试都是同一个文件,暂使用随机
    random_filename = f"{''.join(random.sample('zyxwvutsrqponmlkjihgfedcba', 5))}"
    filename = os.path.basename(url)

    async with async_open('./{}/{}'.format(FILE_STORAGE, random_filename + filename), 'wb') \
    as afp:
    async for chunk, _ in response.content.iter_chunks():
    pbar.update(len(chunk))
    await afp.write(chunk)


    if __name__ == '__main__':
    # url = "http://127.0.0.1:5000"
    # path = ["/image"] * 1000
    # path = ["/filedown"] * 1000
    # full_url = list(map(lambda x: url + x, path))

    # full_url = ["https://edu-files-1251502357.cos.ap-shanghai.myqcloud.com/CourseTeacher_2.5.0.156_DailyBuild.dmg"] * 20
    full_url = ["http://127.0.0.1:5000/filedown"] * 20
    loop = asyncio.get_event_loop()
    tasks = [asyncio.ensure_future(download_file(f"{path}")) for path in full_url]
    loop.run_until_complete(asyncio.gather(*tasks))

Asynchronous I/O

Python核心技术编程与实践


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!