V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
ohayoo
V2EX  ›  Python

请大佬帮忙瞄一眼我这个丑陋的异步协程代码

  •  
  •   ohayoo · 2023-07-28 15:20:13 +08:00 · 2387 次点击
    这是一个创建于 484 天前的主题,其中的信息可能已经有所发展或是发生改变。

    迫于机器配置太低,用多进程多线程,一秒钟才处理几百条 uri ,于是想着来用异步协程来试下,看着文档写出了这样一个丑陋的代码,搞了几万条 uri 测试了下,好像也没啥问题,不打印结果到屏幕的话,一秒钟差不多可以处理 1000 条,大概有这么几个步骤:

    • 1 、uris.txt 有几千万条 uri ,于是每次读 1000 行,避免内存占用过多
    • 2 、利用 Semaphore 来控制并发数量为 100 ,避免 API 端把我给 ban 了
    • 3 、复用 session

    我现在的困惑是:

    • 1 、我这样写,上面 3 点真的有达到目的吗?
    • 2 、最后面 if name == 'main'下面那一坨,在 for 循环里面写 asyncio.run()总觉得怪怪的,但是不这样写,又不知道要怎么写
    • 3 、弱弱的再问个小白问题,不是说事件循环才是 asyncio 的核心嘛?可我这里面也没用 asyncio.get_event_loop(),为啥也能跑得这么顺畅呢?
    • 4 、如果要让代码优雅一点应该怎么修改呢?

    耽误大佬周五下午一点点时间,帮忙瞅一眼,不胜感激!

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    
    import asyncio
    from asyncio import Semaphore
    from aiohttp import ClientSession
    from itertools import islice
    
    
    def get_lines_iterator(filename, n=1000):
        with open(filename) as fp:
            while True:
                lines = list(islice(fp, n))
                if lines:
                    yield lines
                else:
                    break
    
    
    async def delete_file(uri: str,
                          session: ClientSession,
                          sem: Semaphore) -> int:
        headers = {'Authorization': 'xxxxxxxxxxx'}
        url = api + uri
        async with sem, session.delete(url, headers=headers) as response:
            return uri, response.status
    
    
    # 写法 1:
    # async def main(uris):
    #     sem = Semaphore(100)
    #     async with ClientSession() as session:
    #         tasks = [delete_file(uri, session, sem) for uri in uris]
    #         await asyncio.gather(*tasks)
    
    
    # 写法 2:
    async def main(uris):
        sem = Semaphore(100)
        async with ClientSession() as session:
            async with asyncio.TaskGroup() as group:
                result = [group.create_task(delete_file(
                    uri, session, sem)) for uri in uris]
                return result
    
    
    if __name__ == '__main__':
        for lines in get_lines_iterator("uris.txt"):
            uris = [uri.strip() for uri in lines]
            result = asyncio.run(main(uris))
            for x in result:
                print(x.result())
    
    19 条回复    2023-08-02 13:37:54 +08:00
    fzzff
        1
    fzzff  
       2023-07-28 15:56:48 +08:00   ❤️ 1
    以下是对代码进行优化的建议:

    1. 使用异步文件读取:可以使用`aiofiles`库来实现异步文件读取,从而避免阻塞事件循环。这将使得文件的读取操作也能并发进行,提高效率。

    2. 使用异步上下文管理器:`aiohttp`支持异步上下文管理器,你可以使用`async with`语法来创建`ClientSession`,这样会更加简洁,而且会在任务完成后自动关闭会话。

    3. 使用`asyncio.as_completed`:在并发执行任务时,可以使用`asyncio.as_completed`来获取已完成的任务,而不是等待所有任务都完成再处理结果。这样可以更早地得到一部分结果,并在需要时立即处理。

    4. 异常处理:对于异步代码,异常处理十分重要。可以在任务执行时捕获异常,并记录错误信息,以便后续分析和处理。

    下面是优化后的代码:

    ```python
    import asyncio
    import aiofiles
    from aiohttp import ClientSession

    async def delete_file(session: ClientSession, sem: asyncio.Semaphore, uri: str):
    headers = {'Authorization': 'xxxxxxxxxxx'}
    url = api + uri
    try:
    async with sem:
    async with session.delete(url, headers=headers) as response:
    return uri, response.status
    except Exception as e:
    # 处理异常,比如记录错误日志
    print(f"Error occurred while processing {uri}: {str(e)}")
    return uri, None

    async def main(uris):
    sem = asyncio.Semaphore(100)
    async with ClientSession() as session:
    tasks = [delete_file(session, sem, uri) for uri in uris]
    for future in asyncio.as_completed(tasks):
    uri, status = await future
    if status is not None:
    print(f"{uri}: {status}")
    else:
    print(f"{uri}: Error")

    async def read_uris(filename):
    async with aiofiles.open(filename, mode='r') as fp:
    async for line in fp:
    yield line.strip()

    if __name__ == '__main__':
    asyncio.run(main(read_uris("uris.txt")))
    ```

    在优化后的代码中,我们使用`aiofiles`库来异步读取文件,并使用`async for`来逐行获取 URI 。同时,我们使用`asyncio.as_completed`来处理已完成的任务,这样在某些任务执行较慢时,可以更早地输出结果,提高实时性。另外,我们在`delete_file`函数中增加了异常处理,确保在出现异常时不会导致整个任务中断。
    fzzff
        2
    fzzff  
       2023-07-28 16:01:37 +08:00
    以上是 chatgpt 给出的代码优化建议, 另外个人建议你把请求库替换成 httpx
    zong400
        3
    zong400  
       2023-07-28 16:03:11 +08:00
    api 端只能并发 100 ?那你代码优化得再好也没用啊,多部署几个在不同 ip 的服务器吧
    wuwukai007
        4
    wuwukai007  
       2023-07-28 16:05:50 +08:00
    api 并发 100 ,线程 100 并发不是绰绰有余吗?
    fzzff
        5
    fzzff  
       2023-07-28 16:14:01 +08:00   ❤️ 1
    asyncio.run(main(uris))无论如何不应该放到循环里, 每次 asyncio.run()会创建一个新的事件循环, 你这个代码跑完创建了一堆事件循环同时运行性能必然受损, 而且你的 result 接收到的也不是完整的结果, 如果你想并发运行一组任务应该用 asyncio.gather
    ohayoo
        6
    ohayoo  
    OP
       2023-07-28 16:17:39 +08:00 via Android
    @zong400 没有,api 端没有说限制 100 ,只是我自己觉得太高了,估计会 ban 我,所以先填个 100 试一试
    ohayoo
        7
    ohayoo  
    OP
       2023-07-28 16:19:11 +08:00 via Android
    @fzzff 我也觉得放到 for 循环里面肯定不对
    liuwei889
        8
    liuwei889  
       2023-07-28 16:19:53 +08:00
    @fzzff 我也想到 chatgpt 优化了,啊哈哈哈
    ohayoo
        9
    ohayoo  
    OP
       2023-07-28 16:24:29 +08:00 via Android
    手动 @下 @ClericPy 大佬帮忙看看
    zzl22100048
        10
    zzl22100048  
       2023-07-28 17:03:46 +08:00   ❤️ 1
    fzinfz
        11
    fzinfz  
       2023-07-28 17:12:49 +08:00 via iPhone   ❤️ 1
    Maerd
        12
    Maerd  
       2023-07-28 17:17:59 +08:00   ❤️ 1
    最好的方案是使用 asyncio.Queue 来实现协程队列,不要把 asyncio.run 放到循环里,最好是从循环中往队列中塞 url ,然后使用 asyncio.create_task 来控制并发协程数量,从协程中不断读取队列内容实现并发,Semaphore 的实现是较为丑陋且性能低下的,可以看出你的代码风格还是受到了往进程池线程池硬塞内容风格的影响(我同事也喜欢这么写)
    ClericPy
        13
    ClericPy  
       2023-07-28 20:33:45 +08:00   ❤️ 1
    1. 每次读 1000 行,避免内存占用过多
    1000 行内存太小了, 给你个简单的换算公式: 1 万条 URL 大约 1MB 内存
    如果 api 响应较慢, 你会很多时候在等最后少数几条结束才能开始下一轮. 不如像他们说的丢异步队列, 然后开 100 个 task 去消费直到 Queue 空

    2. 利用 Semaphore 来控制并发数量为 100 ,避免 API 端把我给 ban 了
    100 并发实际上很大, 一般 nginx 上带点限流的, 一秒超过 2 个请求就给你 ban 了
    其次还要考虑目标网站承载能力, 否则只会导致 dos 然后失败率骤增. 不要敲打服务器 是爬虫第一课
    如果这服务器是自己家的... 千万行, 考虑改造下批量任务, 甚至这种请求量的就不像 HTTP 场景

    3. 复用 session
    没啥问题, 早期 aiohttp 还有 bug 不关 Session 一大堆事, 你分段来做也还好

    4. asyncio.run 与 get_event_loop
    建议找个 IDE 点 goto definition 看看源码, python 很多内置库源码都值得学习一下.
    一般情况下 asyncio.run 只运行一次, 其他各种逻辑放到 main 函数里就好了. 启动多次也没啥事

    挺简单个爬虫为啥那么纠结, 写个十几万行啥问题都没了. 你都用上 TaskGroup 了, 看看官方文档吧

    PS: 不是大佬, 辨别乱 Q...
    ohayoo
        14
    ohayoo  
    OP
       2023-07-28 22:15:13 +08:00
    @ClericPy #13 哈哈哈哈 因为经常看到你在 py 区安利异步协程,所以。。。。
    ohayoo
        15
    ohayoo  
    OP
       2023-07-28 22:17:01 +08:00
    感谢以上大佬的指教,我结合各位提到的点 再去琢磨琢磨
    julyclyde
        16
    julyclyde  
       2023-07-29 21:11:05 +08:00
    没看明白为什么还有 uris=[ ] 这一行
    你输入文件里每行有多个 uri 吗?
    ohayoo
        17
    ohayoo  
    OP
       2023-08-01 10:50:45 +08:00
    @Maerd #12 特别感谢大佬的指导,用 threading 的时候知道要用 queue ,用协程却不知道应该去看看 asyncio 的 Queue ,我的我的,感谢大佬点醒我
    zyxbcde
        18
    zyxbcde  
       2023-08-02 13:04:02 +08:00 via iPhone
    都是些伪需求啊
    文件那么点完全没必要分开读。
    也没必要卡信号量,aiohttp 在创建 session 时候有个参数可以限制最大连接数。
    另外我个人不太喜欢用上下文管理 session ,都是自己手动关闭。
    ohayoo
        19
    ohayoo  
    OP
       2023-08-02 13:37:54 +08:00
    @zyxbcde #18 后面用了 asyncio.Queue 设置队列长度,开固定数量的 task 去消费等等等等,反正 ok 了,后面复杂的需求直接找个任务队列算了
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1040 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 19:32 · PVG 03:32 · LAX 11:32 · JFK 14:32
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.