使用 aiohttp 试着写了一个爬虫,但是发现可能会出现 一级页面还在抓取的时候,由于队列为空,直接退出的情况。不知该如何去做这个判断?另外不知以下代码这么写是否有其他的问题??
# coding:utf-8
import asyncio
import aiohttp
class Task(object):
def __init__(self, info, priority):
self.priority = priority
self.info = info
def __lt__(self, other):
return self.priority < other.priority
class Spider(object):
def __init__(self, loop=None):
self.loop = loop
conn = aiohttp.TCPConnector(limit=3)
self.session = aiohttp.ClientSession(loop=loop, connector=conn)
self.queue = asyncio.PriorityQueue()
def start(self, page):
task_info = {'callback': self.parse_1, 'page': page}
return task_info
async def set_item(self, task):
pass
async def fetch(self, task):
await asyncio.sleep(2)
task['callback'](task['page'])
async def worker(self):
while True:
next_task = await self.queue.get()
if next_task.info.get('type') == 'item':
asyncio.ensure_future(self.set_item(next_task.info))
else:
asyncio.ensure_future(self.fetch(next_task.info))
self.queue.task_done()
# if self.queue.empty():
# await asyncio.sleep(1)
# if self.queue.empty():
# break
def run(self):
for page in range(1, 10):
self.queue.put_nowait(Task(self.start(page), 0))
self.loop.run_until_complete(self.worker())
def close(self):
if not self.session.closed:
if self.session._connector_owner:
self.session._connector.close()
self.session._connector = None
def parse_1(self, meta):
print('parse_1-----', meta)
for page in range(20, 30):
task = {'callback': self.parse_2, 'page': page}
self.queue.put_nowait(Task(task, 1))
def parse_2(self, meta):
print('parse2----', meta)
for page in range(30, 40):
task = {'callback': self.parse_3, 'page': page}
self.queue.put_nowait(Task(task, 0))
def parse_3(self, meta):
print('parse3----', meta)
loop = asyncio.get_event_loop()
sp = Spider(loop=loop)
sp.run()
sp.close()
1
jimmyczm 2018-10-25 20:37:22 +08:00
我看别人写的是先把队列放到 redis 里,对 redis 进行判断从而进行开始或终止
|
2
zhijiansha OP @jimmyczm 感觉用 redis 应该也会有这个问题。。。除非是标记一下任务状态
|
3
AlisaDestiny 2018-10-25 23:45:37 +08:00
woker 函数里别 while True;弄一个标记,如果为 True 继续,为 False 就停止,
|
4
zhijiansha OP @AlisaDestiny 但爬取何时完成,无法预料,也就无法给设置为 False 啊
|
5
binux 2018-10-26 00:51:01 +08:00
加个 work in progress 标记
|
6
so1n 2018-10-26 01:03:02 +08:00 via Android
手机看的好难受,没看完,你试试 work 那里使用 wait_for 替换 ensure_future
|
7
Yourshell 2018-10-26 09:02:40 +08:00 via iPhone
判断任务队列用 join 而不是 empty
|
8
zhijiansha OP |
9
Yourshell 2018-10-26 10:35:53 +08:00 via iPhone
@zhijiansha empty 是判断队列是否为空,join 是阻塞至所有任务完成,也就是调用 task_done。你用 empty 判断队列为空,只是所有的任务都被 get 了,不代表已经完成了。你可以看看官方的例子 https://docs.python.org/3/library/asyncio-queue.html
|
10
binux 2018-10-26 11:08:09 +08:00
@zhijiansha #8 任何 worker 没处理完之前都是 WIP 啊,除了要判断队列是否为空,还要判断是否有任务是 WIP
|
11
zhijiansha OP |
12
zhijiansha OP ```
async def worker(self): """ 任务调度 :return: """ while True: if not self.queue.empty(): next_data = await self.queue.get() task_id = uuid.uuid1() self.task_running_list.append(task_id) if isinstance(next_data, Item): asyncio.create_task(self.set_item(task_id, next_data.info)) else: asyncio.create_task(self.fetch(task_id, next_data.info)) self.queue.task_done() else: await asyncio.sleep(0) if self.queue.empty() and not self.task_running_list: break ``` 根据 @binux 的提示,这么处理可解决该问题。 |
13
binux 2018-10-29 11:41:02 +08:00
@zhijiansha #12 处理完要把 task_id 删掉啊
|
14
zhijiansha OP @binux 嗯嗯,删除操作是在 fetch 方法里面执行的
|
15
a65420321a 2018-10-30 16:11:57 +08:00
怎么贴的代码?
|
16
Harlaus 2018-11-14 16:06:23 +08:00
建议 aio+mq
|