重点为介绍 asyncio
。
asyncio
可以实现单线程并发 IO 操作。Asynchronous HTTP Client/Server for asyncio and Python。
aiohttp 则是基于
asyncio 实现的 HTTP 框架。
我们先安装 aiohttp
:
编写一个 Flask 服务器,模拟网络爬虫场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| from flask import Flask import time
app = Flask(__name__) @app.route('/') def index(): time.sleep(3) return 'Hello ,world!'
@app.route('/go') def go(): time.sleep(3) return 'Hello ,go!'
@app.route('/python') def python(): time.sleep(3) return 'Hello ,python!'
@app.route('/c') def c(): time.sleep(3) return 'Hello ,c!'
if __name__ == '__main__': app.run(threaded=True, port=5000)
|
绑定回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import asyncio import requests async def request(): url = 'https://www.baidu.com' status = requests.get(url) return status def callback(task): print('Status:', task.result()) coroutine = request() task = asyncio.ensure_future(coroutine) task.add_done_callback(callback) print('Task:', task) loop = asyncio.get_event_loop() loop.run_until_complete(task) print('Task:', task)
|
多线程协程爬虫,测试是并行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import time import requests import asyncio from multiprocessing.dummy import Pool as ThreadPool
path = ["/go", "/c", "/python", "/"] * 100 host = "http://127.0.0.1:5000" full_url = list(map(lambda x: f"{host}{x}", path))
thread = 4
async def get_page(url, loop): future = loop.run_in_executor( None, requests.get, url ) response = await future print(response.text)
def divide(i): import asyncio loop = asyncio.new_event_loop() asyncio.set_event_loop(loop)
tasks = [asyncio.ensure_future(get_page(url, loop)) for url in full_url] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
if __name__ == '__main__': start = time.time() pool = ThreadPool(thread) pool.map(divide, ) pool.close() pool.join() print("爬取{0}个网页 ,总花费时间:{1:.2f}s".format( len(path), start - time.time()), end="")
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import asyncio import aiohttp import time from aiomultiprocess import Pool start = time.time() async def get(url): session = aiohttp.ClientSession() response = await session.get(url) result = await response.text() session.close() return result async def request(): url = 'http://127.0.0.1:5000' urls = [url for _ in range(100)] async with Pool() as pool: result = await pool.map(get, urls) return result coroutine = request() task = asyncio.ensure_future(coroutine) loop = asyncio.get_event_loop() loop.run_until_complete(task) end = time.time() print('Cost time:', end - start)
|
异步协程协程 request 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import time import requests import asyncio
path = ["/go", "/c", "/python", "/"] * 100 host = "http://127.0.0.1:5000" full_url = list(map(lambda x: f"{host}{x}", path))
async def get_page(url, loop): future = loop.run_in_executor( None, requests.get, url ) response = await future
if __name__ == '__main__': start = time.time() loop = asyncio.get_event_loop() tasks = [asyncio.ensure_future(get_page(url, loop)) for url in full_url] loop.run_until_complete(asyncio.wait(tasks)) loop.close() print("爬取{0}个网页 ,总花费时间:{1:.2f}s".format( len(path), start - time.time()), end="")
|
输出结果: 爬取400个网页 ,总花费时间:-21.20s
异步协程爬虫
因 requests 不支持异步。换 asyncio
代码代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import time import aiohttp import asyncio
async def get_page(url): async with aiohttp.ClientSession() as session: async with await session.get(url=url) as response: page_text = await response.text() print(page_text)
path = ["/go", "/c", "/python", "/"] * 100 url = "http://127.0.0.1:5000" full_url = list(map(lambda x: url + x, path))
if __name__ == '__main__': print(full_url) start = time.time() loop = asyncio.get_event_loop() tasks = [asyncio.ensure_future(get_page(url)) for url in full_url]
loop.run_until_complete(asyncio.wait(tasks)) print("爬取{0}个网页 ,总花费时间:{1:.2f}s".format( len(path), start - time.time()), end="")
|
输出结果: 爬取400个网页 ,总花费时间:-3.35s
如何实现数据解析—任务的绑定回调机制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import aiohttp import asyncio
def callback(task): print('this is callback()') page_text = task.result() print('在回调函数中,实现数据解析') async def get_page(url): async with aiohttp.ClientSession() as session: async with await session.get(url=url) as response: page_text = await response.text()
return page_text start = time.time() path = ["/go", "/c", "/python", "/"] * 100 host = "http://127.0.0.1:5000" full_url = list(map(lambda x: f"{host}{x}", path))
tasks = [] loop = asyncio.get_event_loop() for url in urls: c = get_page(url) task = asyncio.ensure_future(c) task.add_done_callback(callback) tasks.append(task) loop.run_until_complete(asyncio.wait(tasks)) print('总耗时:',time.time()-start)
|
结合代理池,爬虫速度能做到极致。
参考资料: