线程,协程对比和 Python 爬虫使用

重点为介绍 asyncio

asyncio 可以实现单线程并发 IO 操作。Asynchronous HTTP Client/Server for asyncio and Python。

aiohttp 则是基于 asyncio 实现的 HTTP 框架。

我们先安装 aiohttp

1
pip install aiohttp

编写一个 Flask 服务器,模拟网络爬虫场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from flask import Flask
import time

app = Flask(__name__)
@app.route('/')
def index():
time.sleep(3)
return 'Hello ,world!'



@app.route('/go')
def go():
time.sleep(3)
return 'Hello ,go!'


@app.route('/python')
def python():
time.sleep(3)
return 'Hello ,python!'


@app.route('/c')
def c():
time.sleep(3)
return 'Hello ,c!'


if __name__ == '__main__':
app.run(threaded=True, port=5000)

绑定回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
import requests

async def request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status

def callback(task):
print('Status:', task.result())

coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
print('Task:', task)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)

多线程协程爬虫,测试是并行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import time
import requests
import asyncio
from multiprocessing.dummy import Pool as ThreadPool

path = ["/go", "/c", "/python", "/"] * 100
host = "http://127.0.0.1:5000"
full_url = list(map(lambda x: f"{host}{x}", path))

thread = 4


async def get_page(url, loop):
future = loop.run_in_executor(
None, requests.get, url
)
response = await future
print(response.text)


def divide(i):
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

tasks = [asyncio.ensure_future(get_page(url, loop))
for url in full_url]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()


if __name__ == '__main__':
start = time.time()
pool = ThreadPool(thread)
# i = [j for j in range(0, thread)]
pool.map(divide, )
pool.close()
pool.join()
print("爬取{0}个网页 ,总花费时间:{1:.2f}s".format(
len(path), start - time.time()), end="")


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import aiohttp
import time
from aiomultiprocess import Pool

start = time.time()

async def get(url):
session = aiohttp.ClientSession()
response = await session.get(url)
result = await response.text()
session.close()
return result

async def request():
url = 'http://127.0.0.1:5000'
urls = [url for _ in range(100)]
async with Pool() as pool:
result = await pool.map(get, urls)
return result

coroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)

end = time.time()
print('Cost time:', end - start)

异步协程协程 request 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import time
import requests
import asyncio

path = ["/go", "/c", "/python", "/"] * 100
host = "http://127.0.0.1:5000"
full_url = list(map(lambda x: f"{host}{x}", path))


async def get_page(url, loop):
future = loop.run_in_executor(
None, requests.get, url
)
response = await future
# print(response.text)


if __name__ == '__main__':
start = time.time()
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(get_page(url, loop)) for url in full_url]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print("爬取{0}个网页 ,总花费时间:{1:.2f}s".format(
len(path), start - time.time()), end="")

输出结果: 爬取400个网页 ,总花费时间:-21.20s

异步协程爬虫

因 requests 不支持异步。换 asyncio

代码代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import time
import aiohttp
import asyncio


async def get_page(url):
async with aiohttp.ClientSession() as session:
async with await session.get(url=url) as response:
page_text = await response.text()
print(page_text)


path = ["/go", "/c", "/python", "/"] * 100
url = "http://127.0.0.1:5000"
full_url = list(map(lambda x: url + x, path))

#
if __name__ == '__main__':
print(full_url)
start = time.time()
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(get_page(url)) for url in full_url]

loop.run_until_complete(asyncio.wait(tasks))
print("爬取{0}个网页 ,总花费时间:{1:.2f}s".format(
len(path), start - time.time()), end="")



输出结果: 爬取400个网页 ,总花费时间:-3.35s

如何实现数据解析—任务的绑定回调机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
### 如何实现数据解析---任务的绑定回调机制
import aiohttp
import asyncio
#回调函数:解析响应数据
def callback(task):
print('this is callback()')
#获取响应数据
page_text = task.result()
print('在回调函数中,实现数据解析')

async def get_page(url):
async with aiohttp.ClientSession() as session:
async with await session.get(url=url) as response:
page_text = await response.text() #read() json()
# print(page_text)
return page_text
start = time.time()
path = ["/go", "/c", "/python", "/"] * 100
host = "http://127.0.0.1:5000"
full_url = list(map(lambda x: f"{host}{x}", path))


tasks = []
loop = asyncio.get_event_loop()
for url in urls:
c = get_page(url)
task = asyncio.ensure_future(c)
#给任务对象绑定回调函数用于解析响应数据
task.add_done_callback(callback)
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
print('总耗时:',time.time()-start)

结合代理池,爬虫速度能做到极致。

参考资料: