Python asyncio 异步函数调用示例
异步函数调用示例
运行时间装饰器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15import time
import functools
def run_benchmark(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.perf_counter()
res = await func(*args, **kwargs)
print(f'{func.__name__} run {args} {time.perf_counter() - start_time}')
return res
return wrapper首先 普通函数同步代码
1
2
3
4
5
6
7
8
9
10
11
12
13import time
def crawl_page(url):
print('crawling {}'.format(url))
sleep_time = int(url.split('_')[-1])
time.sleep(sleep_time)
print('OK {}'.format(url))
def main(urls):
for url in urls:
crawl_page(url)
main(['url_1', 'url_2', 'url_3', 'url_4'])
异步函数同步代码
async 修饰词声明异步函数,于是,这里的 crawl_page 和 main 都变成了异步函数。而调用异步函数,我们便可得到一个协程对象(coroutine object)
asyncio.run(main()) 作为主程序的入口函数,在程序运行周期内,只调用一次 asyncio.run。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18import asyncio
async def crawl_page(url):
print('crawling {}'.format(url))
sleep_time = int(url.split('_')[-1])
print(sleep_time)
await asyncio.sleep(sleep_time)
print('OK {}'.format(url))
async def main(urls):
for url in urls:
await crawl_page(url)
asyncio.run(main(['url_1', 'url_10', 'url_3', 'url_4']))await 是同步调用,因此, crawl_page(url) 在当前的调用结束之前,是不会触发下一次调用的。于是,这个代码效果就和上面完全一样了,相当于我们用异步接口写了个同步代码。
异步函数代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22import asyncio
from time_dec import run_benchmark
@run_benchmark
async def crawl_page(url):
print('crawling {}'.format(url))
sleep_time = int(url.split('_')[-1])
await asyncio.sleep(sleep_time)
print('OK {}'.format(url))
@run_benchmark
async def main(urls):
tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
for task in tasks:
await task
asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))协程任务概念 任务(Task)有了协程对象后,便可以通过
asyncio.create_task
来创建任务。任务创建后很快就会被调度执行,这样,我们的代码也不会阻塞在任务这里。所以,我们要等所有任务都结束才行,用for task in tasks: await task
即可。结果显示,运行总时长等于运行时间最长的爬虫。
异步函数 task
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20import asyncio
from time_dec import run_benchmark
@run_benchmark
async def crawl_page(url):
print('crawling {}'.format(url))
sleep_time = int(url.split('_')[-1])
await asyncio.sleep(sleep_time)
print('OK {}'.format(url))
@run_benchmark
async def main(urls):
tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
await asyncio.gather(*tasks)
asyncio.run(main(['url_1', 'url_10', 'url_3', 'url_4']))*tasks
解包列表,将列表变成了函数的参数;与之对应的是,** dict
将字典变成了函数的参数。异步代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
async def worker_1():
print('worker_1 start')
await asyncio.sleep(1)
print('worker_1 done')
async def worker_2():
print('worker_2 start')
await asyncio.sleep(2)
print('worker_2 done')
async def main():
task1 = asyncio.create_task(worker_1())
task2 = asyncio.create_task(worker_2())
print('before await')
await task1
print('awaited worker_1')
await task2
print('awaited worker_2')
asyncio.run(main())
限定时间执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32import asyncio
async def worker_1():
await asyncio.sleep(1)
return 1
async def worker_2():
await asyncio.sleep(2)
return 2 / 0
async def worker_3():
await asyncio.sleep(3)
return 3
async def main():
task_1 = asyncio.create_task(worker_1())
task_2 = asyncio.create_task(worker_2())
task_3 = asyncio.create_task(worker_3())
await asyncio.sleep(2)
task_3.cancel()
res = await asyncio.gather(task_1, task_2, task_3, return_exceptions=True)
print(res)
asyncio.run(main())worker_1 正常运行,worker_2 运行中出现错误,worker_3 执行时间过长被我们 cancel 掉了,这些信息会全部体现在最终的返回结果 res 中。
注意
return_exceptions=True
这行代码。如果不设置这个参数,错误就会完整地 throw 到我们这个执行层,从而需要 try except 来捕捉,这也就意味着其他还没被执行的任务会被全部取消掉。为了避免这个局面,我们将 return_exceptions 设置为 True 即可使用实例爬虫
同步函数 爬虫流程为先获取电影链接、名称、日期,然后再一次发起连接获取图片
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29import requests
from bs4 import BeautifulSoup
def main():
url = "https://movie.douban.com/cinema/later/beijing/"
headers_ = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.81 Safari/537.36"}
init_page = requests.get(url, headers=headers_).content
init_soup = BeautifulSoup(init_page, 'lxml')
all_movies = init_soup.find('div', id="showing-soon")
for each_movie in all_movies.find_all('div', class_="item"):
all_a_tag = each_movie.find_all('a')
all_li_tag = each_movie.find_all('li')
movie_name = all_a_tag[1].text
url_to_fetch = all_a_tag[1]['href']
movie_date = all_li_tag[0].text
response_item = requests.get(url_to_fetch, headers=headers_).content
soup_item = BeautifulSoup(response_item, 'lxml')
img_tag = soup_item.find('img')
print('{} {} {}'.format(movie_name, movie_date, img_tag['src']))
main()异步函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50import asyncio
import aiohttp
from bs4 import BeautifulSoup
headers_ = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.81 Safari/537.36"}
async def fetch_content(url):
async with aiohttp.ClientSession(
headers=headers_, connector=aiohttp.TCPConnector(ssl=False)
) as session:
async with session.get(url) as response:
return await response.text()
async def main():
url = "https://movie.douban.com/cinema/later/beijing/"
init_page = await fetch_content(url)
init_soup = BeautifulSoup(init_page, 'lxml')
movie_names, urls_to_fetch, movie_dates = [], [], []
all_movies = init_soup.find('div', id="showing-soon")
count = 0
for each_movie in all_movies.find_all('div', class_="item"):
all_a_tag = each_movie.find_all('a')
all_li_tag = each_movie.find_all('li')
movie_names.append(all_a_tag[1].text)
urls_to_fetch.append(all_a_tag[1]['href'])
movie_dates.append(all_li_tag[0].text)
count += 1
if count == 2:
break
# tasks = [fetch_content(url) for url in urls_to_fetch]
tasks = [asyncio.create_task(fetch_content(url)) for url in urls_to_fetch]
pages = await asyncio.gather(*tasks)
for movie_name, movie_date, page in zip(movie_names, movie_dates, pages):
soup_item = BeautifulSoup(page, 'lxml')
img_tag = soup_item.find('img')
print('{} {} {}'.format(movie_name, movie_date, img_tag['src']))
asyncio.run(main())- 协程和多线程的区别,主要在于两点,一是协程为单线程;二是协程由用户决定,在哪些地方交出控制权,切换到下一个任务。
- 协程的写法更加简洁清晰,把 async / await 语法和 create_task 结合来用,对于中小级别的并发需求已经毫无压力。
- 写协程程序的时候,你的脑海中要有清晰的事件循环概念,知道程序在什么时候需要暂停、等待 I/O,什么时候需要一并执行到底。
事件循环异步函数
Python 3.7+ asyncio.run(main())
如果您运行的是 Python 3.7 版本,则需要替换
asyncio.run(main())
为以下内容:1
2loop = asyncio.get_event_loop()
loop.run_until_complete(main())否则,您将收到一条错误消息:
AttributeError: module 'asyncio' has no attribute 'run'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48import os
import random
import asyncio
import aiohttp
from aiofile import async_open
from tqdm import tqdm
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36'
}
FILE_STORAGE = 'file_storage' # 下载保存文件的文件夹名
if not os.path.exists(FILE_STORAGE):
os.makedirs(FILE_STORAGE)
async def download_file(url: str):
semaphore = asyncio.Semaphore(2) # 并发数量
async with semaphore:
conn = aiohttp.TCPConnector(limit=2)
async with aiohttp.ClientSession(headers=headers, connector=conn) as session:
async with session.get(url) as response:
# tqdm 做个终端回显 没啥用
if response.headers.get('Content-Length'):
pbar = tqdm(total=int(response.headers['Content-Length']), desc=url)
# 测试都是同一个文件,暂使用随机
random_filename = f"{''.join(random.sample('zyxwvutsrqponmlkjihgfedcba', 5))}"
filename = os.path.basename(url)
async with async_open('./{}/{}'.format(FILE_STORAGE, random_filename + filename), 'wb') \
as afp:
async for chunk, _ in response.content.iter_chunks():
pbar.update(len(chunk))
await afp.write(chunk)
if __name__ == '__main__':
# url = "http://127.0.0.1:5000"
# path = ["/image"] * 1000
# path = ["/filedown"] * 1000
# full_url = list(map(lambda x: url + x, path))
# full_url = ["https://edu-files-1251502357.cos.ap-shanghai.myqcloud.com/CourseTeacher_2.5.0.156_DailyBuild.dmg"] * 20
full_url = ["http://127.0.0.1:5000/filedown"] * 20
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(download_file(f"{path}")) for path in full_url]
loop.run_until_complete(asyncio.gather(*tasks))
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!