在使用 python 协程下载图片中,最终协程的任务数卡在 97 一直循环,不知道哪里出了问题,有大佬知道什么情况吗,困扰我好久
这个是运行的结果,在任务数为 80 一直卡着
队列是否为空.... 80
队列是否为空.... 80
.
.
.
队列是否为空.... 80
队列是否为空.... 80
下面贴上代码
from lxml import etree
import os
import pandas as pd
import asyncio
import aiohttp
from random import randint
import cchardet
import aiofiles
import logging
class sikupicture_Spider(object):
def __init__(self):
# self.seens_url = []
self.loop = asyncio.get_event_loop()
self.queue = asyncio.PriorityQueue()
self._workers = 0 # 当前工作数
self._max_workers = 150 # 最大工作数
self.overtime = {} # {url: times,} 记录失败的 URL 的次数
self.overtime_threshold = 4
self.headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36",
}
self.list_content = []
async def init_url(self):
info = pd.read_excel(r"{}".format(os.path.abspath('moban.xlsx'))).fillna('')
for ite in info.itertuples():
await self.queue.put((randint(1, 5), getattr(ite, 'url')))
async def fetch(self, session, url, timeout, headers=None, binary=False, proxy=None):
_headers = self.headers
if headers:
_headers = headers
try:
async with session.get(url, headers=_headers, timeout=timeout, proxy=proxy, allow_redirects=False) as resp:
status_code = resp.status
if status_code == 403:
print("url-403", url)
if url in self.overtime:
self.overtime[url] += 1
if self.overtime[url] > self.overtime_threshold:
pass
await self.queue.put((randint(1, 5), url))
else:
self.overtime[url] = 1
await self.queue.put((randint(1, 5), url))
status_code = 0
html = None
if binary:
text = await resp.read()
encoding = cchardet.detect(text)
html = text.encode(encoding, errors='ignore')
else:
html = await resp.text()
except TimeoutError:
print("url-overtime", url)
if url in self.overtime:
self.overtime[url] += 1
if self.overtime[url] > self.overtime_threshold:
pass
await self.queue.put((randint(1, 5), url))
else:
self.overtime[url] = 1
await self.queue.put((randint(1, 5), url))
status_code = 0
html = None
return status_code, html
async def download_img(self, session, img_url, timeout, url, headers=None, binary=True, proxy=None):
_headers = self.headers
if headers:
_headers = headers
try:
async with session.get(img_url, headers=_headers, timeout=timeout, proxy=proxy, allow_redirects=False) as resp:
status_code = resp.status
if binary:
html = await resp.read()
else:
html = await resp.text()
except TimeoutError:
print("url-overtime", img_url)
if url in self.overtime:
self.overtime[url] += 1
if self.overtime[url] > self.overtime_threshold:
pass
else:
await self.queue.put((randint(1, 5), url))
else:
self.overtime[url] = 1
await self.queue.put((randint(1, 5), url))
status_code = 0
html = None
return status_code, html
def parse_source(self, source):
try:
response_1 = etree.HTML(source)
except Exception as err:
logging.error(f'parse error:{err}')
url = ""
else:
img_url = response_1.xpath("//a[@href='javascript:;']/@supsrc")[0] if len(
response_1.xpath("//a[@href='javascript:;']/@supsrc")[0]) else ""
return img_url
async def process(self, session, url, timeout):
status, source = await self.fetch(session, url, timeout)
file_name = url.replace("http://item.secoo.com/", "").replace(".shtml", "")
if status == 200:
img_url = self.parse_source(source)
img_status, img_source = await self.download_img(session, img_url, timeout, url)
if img_status == 200:
async with aiofiles.open("F:\\dawnzhu\\picture\\"+file_name+".jpg", "wb") as f:
await f.write(img_source)
self._workers -= 1
print("任务完成", self._workers, "url_status", status, "img_status", img_status)
else:
self._workers -= 1
print("任务完成", self._workers, "url_status", status,)
async def loop_crawl(self):
await self.init_url()
timeout = aiohttp.ClientTimeout(total=20)
conn = aiohttp.TCPConnector(loop=self.loop, limit=50, force_close=True, enable_cleanup_closed=True)
session = aiohttp.ClientSession(connector=conn, timeout=timeout)
while True:
if self._workers >= self._max_workers:
print("work 的判断")
await asyncio.sleep(5)
continue
if self.queue.empty():
print("队列是否为空....", self._workers)
await asyncio.sleep(5)
if self._workers == 0:
break
continue
_, url = await self.queue.get()
asyncio.ensure_future(self.process(session, url, timeout))
self._workers += 1
print("队列剩余数量", self.queue.qsize(), self._workers)
await session.close()
def run(self):
try:
self.loop.run_until_complete(self.loop_crawl())
except KeyboardInterrupt:
self.loop.close()
if __name__ == '__main__':
sp = sikupicture_Spider()
sp.run()
1
itskingname 2020-05-27 09:37:02 +08:00
试一试把 conn = aiohttp.TCPConnector(loop=self.loop, limit=50, force_close=True, enable_cleanup_closed=True) 里面的 limit 参数调整到 500
|
2
dawnzhu OP @itskingname 谢谢大佬。我试下,这个是什么原因呢,并发数量少?
|
3
dawnzhu OP @itskingname 不行的
|
4
Vegetable 2020-05-27 09:53:42 +08:00
fetch 中捕获了超时,其他异常还是有可能向上抛出的,而协程中的异常未处理异常是不会终止程序,只是会输出一段
Task exception was never retrieved 这样的信息。 process 里并没有捕获异常,一旦出现异常会出现_worker 不能正确扣减,while 循环就无法跳出了 目前只看到这个可能。你这个代码写的很有意思,工工整整的,但是很多地方都挺底层的,比如手动管理 worker 数量而不是 Semaphore,用 aiohttp 而不是 httpx |