V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
firejoke
V2EX  ›  Python

关于 asyncio 执行 IO 密集型操作的不解

  •  
  •   firejoke · 2021-11-20 22:17:36 +08:00 · 4236 次点击
    这是一个创建于 1154 天前的主题,其中的信息可能已经有所发展或是发生改变。

    有一个读文件然后写数据库的操作,想尝试使用协程。
    使用协程的:

    async def parse_text(file_path: Path, context_qs: [asyncio.Queue]):
        ql = len(context_qs)
        i = 0
        # 每一个 Queue 放 step 个数据就切换下一个
        step = 2
        with open(file_path, encoding="utf8") as f:
            for text in f:
                if i // step == ql:
                    i = 0
                context_q = context_qs[i // step]
                context = {}
                text = re.findall(r"\d+", text)
                if text:
                    context = {"解析然后组装成 dict"}
                    await context_q.put(context)
                    # 这里如果不 join ,会一直在这个 for 循环里不出去
                    await context_q.join()
                    i = i + 1
            else:
                await context_q.put("结束标记")
                return
    
    
    async def write_db(context_q: asyncio.Queue, model: ModelBase):
        async with AsyncSession() as session:
            while 1:
                context = await context_q.get()
                if context["结束标记"] == "end":
                    return
                info, obj = None, None
                try:
                    if context["info"]:
                        info = await session.execute(
                            select(InfoModel).filter(
                                InfoModel.attr == context["info"]
                            )
                        )
                        info = info.scalars().one_or_none()
                        if not info:
                            info = InfoModel(attr=context["info"])
                            session.add(info)
                    if context["header"]:
                        obj = await session.execute(
                            select(model).filter(
                                model.header == context["header"]
                            ).options(selectinload(getattr(model, "info")))
                        )
                        obj = obj.scalars().one_or_none()
                        if not obj:
                            obj = model(header=context["header"])
                            session.add(obj)
                    if obj or info:
                        if info not in obj.info:
                            obj.info.append(info)
                            session.add(obj)
                        await session.commit()
                except Exception as e:
                    await session.rollback()
                    raise e
                else:
                    context_q.task_done()
    
    
    async def main():
    	# 每个读取文件并解析的方法对应 c_q_count 个写数据库的方法
        c_q_count = 3
        a_context_qs = [asyncio.Queue() for i in range(c_q_count)]
        b_context_qs = [asyncio.Queue() for i in range(c_q_count)]
        tasks = [
            asyncio.create_task(
                parse_text(Path("a.txt"), a_context_qs)
            ),
            asyncio.create_task(
                parse_text(Path("b.txt"), b_context_qs)
            ),
        ]
        for i in range(c_q_count):
            tasks.append(asyncio.create_task(write_db(a_context_qs[i], AModel)))
            tasks.append(asyncio.create_task(write_db(b_context_qs[i], BModel)))
        await asyncio.gather(*tasks)
    
    
    
    if __name__ == '__main__':
        asyncio.run(main(), debug=settings.DEBUG)
    
    

    不使用协程的:

    def sync_read_file():
        af = Path("a.txt").open(encoding="utf8")
        bf = Path("b.txt").open(encoding="utf8")
        with Session() as session:
            while 1:
                if af:
                    try:
                        text = af.readline()
                        context = parse_text(text)
                        sync_write_db(session, context, AModel)
                    except IOError:
                        af.close()
                        af = None
                if bf:
                    try:
                        text = bf.readline()
                        context = parse_text(text)
                        sync_write_db(session, context, BModel)
                    except IOError:
                        bf.close()
                        bf = None
                if not af and not bf:
                    return
    
    
    def sync_write_db(session, context, model):
        info, obj = None, None
        try:
            if context["info"]:
                info = session.execute(
                    select(Info).filter(
                        Info.attr == context["info"]
                    )
                )
                info = info.scalars().one_or_none()
                if not info:
                    info = Info(attr=context["info"])
                    session.add(info)
            if context["header"]:
                obj = session.execute(
                    select(model).filter(model.info == context["info"]))
                obj = obj.scalars().one_or_none()
                if not obj:
                    obj = model(info=context["info"])
                    session.add(obj)
            if obj or info:
                if info not in obj.info:
                    obj.info.append(info)
                    session.add(obj)
                session.commit()
        except Exception as e:
            session.rollback()
            raise e
    
    
    if __name__ == '__main__':
        sync_read_file()
    
    

    这个协程的方法,每秒每个表可以写 400 多行,改为同步单线程的还是每秒写 400 多行。
    不知道是我协程的用法有问题?还是说有别的什么原因?

    36 条回复    2023-07-21 15:01:08 +08:00
    jenlors
        1
    jenlors  
       2021-11-20 23:11:37 +08:00
    试试 aiofiles 之类的,你的文件 IO 还是同步的
    Trim21
        2
    Trim21  
       2021-11-20 23:15:05 +08:00
    with open(file_path, encoding="utf8") as f:
    for text in f:

    这两行都是阻塞的
    firejoke
        3
    firejoke  
    OP
       2021-11-20 23:33:33 +08:00
    @long2ice #1
    @Trim21 #2
    文件这里只是读取,然后放进队列里,这也会导致阻塞吗?
    Trim21
        4
    Trim21  
       2021-11-20 23:36:52 +08:00
    @firejoke #3 同步 io 会阻塞掉整个事件循环。
    firejoke
        5
    firejoke  
    OP
       2021-11-20 23:47:33 +08:00
    @Trim21 #4 所以,也会导致如果不主动用队列的 join 阻塞住,就不会跳到其他 await 的地方?
    Nitroethane
        6
    Nitroethane  
       2021-11-20 23:48:52 +08:00 via iPhone
    @firejoke 如果读取文件速度比较慢,而且文件比较大的话影响应该比较明显
    firejoke
        7
    firejoke  
    OP
       2021-11-20 23:50:19 +08:00
    @Nitroethane #6 每行数据小于 1kb ,而且是用的 for ,这里相当于一个生成器
    Trim21
        8
    Trim21  
       2021-11-20 23:50:53 +08:00
    @firejoke #5 不是,你的代码中仅仅会阻塞在 open 和 for text in f 这两行。在等待这两行底层的同步 io 完成的时间里是不会运行其他 task 的。
    firejoke
        9
    firejoke  
    OP
       2021-11-21 00:19:02 +08:00
    @Trim21 #8 我改成了 asyncfiles ,然后把队列的 join 去掉了,这次成功跳到了其他 await 的位置,确实如你所说,感谢!
    但测试发现,虽然没了 io 的阻塞,但写入速度还是没太大变化,他每读一行,切到其他 task ,和我之前没读一行,join 住,就执行流程来说,是不是没差?
    Trim21
        10
    Trim21  
       2021-11-21 00:51:20 +08:00 via Android
    我没仔细看完整的代码,只是看到一开始就有同步阻塞的问题就回复了。
    locoz
        11
    locoz  
       2021-11-21 02:50:56 +08:00
    目测是正则导致的阻塞...有一说一你这种情况不太适合用 asyncio ,或者说不太适合没有包上隐式多进程的 asyncio ,毕竟不是纯粹的 IO 操作。然后文件操作方面 aiofiles 实际背后也是靠线程池跑的,这一点需要注意一下,有时候可能会导致踩坑。
    documentzhangx66
        12
    documentzhangx66  
       2021-11-21 03:29:02 +08:00
    先监视一下设备性能极限。
    iostat -x -m -d 1
    LeeReamond
        13
    LeeReamond  
       2021-11-21 04:07:18 +08:00
    大概看了一眼楼上说的应该没问题,并非所有类型的任务都能通过异步加速,你要做好心理准备。另外 aiofiles 的实现其实很丑陋。。楼上说是线程池跑的,我有点忘记具体情况了,只记得以前读源码的印象是很丑陋。。
    Contextualist
        14
    Contextualist  
       2021-11-21 08:17:31 +08:00
    看上去没有明显的问题,不过对于任何为了改进性能的重写建议还是先 profile 一下,看看瓶颈到底出在哪个调用上。

    然后异步文件 IO 不是为了提升性能(降低平均延迟)的,而是为了降低尾延迟的,参见: https://trio.readthedocs.io/en/stable/reference-io.html#background-why-is-async-file-i-o-useful-the-answer-may-surprise-you
    2i2Re2PLMaDnghL
        15
    2i2Re2PLMaDnghL  
       2021-11-21 08:47:44 +08:00
    (我会尝试先把所有信息读进内存然后 timeit 数据库部分,看瓶颈是不是文件
    lesismal
        16
    lesismal  
       2021-11-21 11:37:53 +08:00
    不是说给函数加上异步就是一切都异步了:
    1. 异步的函数 A
    2. A 内部调用 B C D ,B C D 有任意同步阻塞的行为,A 也一样跟着阻塞

    py 的性能痛点远不只是 asyncio 就能解决的了的,how about trying golang -_-
    firejoke
        17
    firejoke  
    OP
       2021-11-21 12:26:21 +08:00
    @locoz #11 我也感觉似乎没发挥出 asyncio 的优势,每一条数据都不超过 1kb ,所以可能除了数据库操作稍微耗时长一点,其他地方等待的很少,所以和单线程的性能差不多?另外请教一下,“没有包上隐式多进程” 具体是指什么呢?
    firejoke
        18
    firejoke  
    OP
       2021-11-21 12:30:43 +08:00
    @documentzhangx66 #12 设备性能应该没问题,12 核 24 线程,64G 内存,磁盘读取速度也没有跑满,IO 读写也不是特别高。
    firejoke
        19
    firejoke  
    OP
       2021-11-21 12:31:57 +08:00
    @LeeReamond #13 嗯,我昨天也想了一下,如果每一步阻塞住的操作实际上都很快,那 asyncio 其实发挥不出切换等待的优势。
    locoz
        20
    locoz  
       2021-11-21 12:37:05 +08:00
    @firejoke #17 建议用调试工具或者排除法看看具体是哪里拖慢了,单看代码和前面的讨论我感觉是正则部分导致的。

    前面没讲清楚,“包上隐式多进程的 asyncio”指的是把多进程和协程结合,开一堆子进程然后每个子进程一个 eventloop ,因为之前有看到过一个专门的库把这部分操作给隐式处理了,使用起来两三行搞定,不需要自己写进程管理部分。然后一些框架其实也会隐式地做这种结合处理来提高效率。
    firejoke
        21
    firejoke  
    OP
       2021-11-21 12:41:23 +08:00
    @Contextualist #14 看文档的意思,是说用异步文件 IO ,在从内存读取时反倒会变慢,在从磁盘读取的时候会加快,在不同环境下其结果是不可预测的。那我如果单独用一个进程读取文件到内存,然后另一个进程从内存读取然后再操作,应该可以绕开这个问题。
    firejoke
        22
    firejoke  
    OP
       2021-11-21 12:48:27 +08:00
    @locoz #20 我昨天最后也是改成用多进程了,一个进程专门读文件,然后放进队列,其他子进程从队列读,然后操作数据库,那看来我思路没跑偏。还有其他的解法吗?多进程和协程的结合,一般都是以多进程为主吗?
    Contextualist
        23
    Contextualist  
       2021-11-21 14:01:03 +08:00
    又看了一下你贴出来文件的部分,你是不是就两个大文件(就是说不是大量小文件),那文件 IO 就基本不可能是你的瓶颈,你看到磁盘读取没跑满很有可能是你下游的处理速度没跟上。

    多进程和协程,感觉你自己也总结出来了。协程得用在有长时间等待系统调用 (syscall) 的地方(比如网络、子进程、定时任务)。CPU 密集的操作得用多线程或多进程,但在 Python 里有 GIL ,就只能用多进程。
    firejoke
        24
    firejoke  
    OP
       2021-11-21 14:17:56 +08:00
    @Contextualist #23 是的,就是两个大文件,所以我也觉得文件 IO 不是我这里的瓶颈,协程在这个场景中没体现出他的优势,我已经改成了多进程了。
    Contextualist
        25
    Contextualist  
       2021-11-21 14:26:48 +08:00
    我对数据库不熟,不过我猜对于很多数据库并发写是不会有性能提升的,用单线程就可以了,但你可能需要 batch / bulk 操作,用来一次性插入数十条、数百条数据,而不是一次插入一条。
    O5oz6z3
        26
    O5oz6z3  
       2021-11-21 15:05:20 +08:00
    虽然不懂,看完楼上感觉原因之一在于 asyncio 的上限就是单线程,而单线程吞吐量不如多线程?
    firejoke
        27
    firejoke  
    OP
       2021-11-21 16:06:46 +08:00
    @Contextualist #25 对欸!资源是消耗在每一条查询和写入的操作上,如果批量写,就可以降低写入频率,至于查询,我已经在查询字段上加了索引,我改一下试试。感谢~
    然后我看到你之前提到的 trio ,看他的文档像是涉及到异步操作的都有涉及,感觉非常不错啊。
    firejoke
        28
    firejoke  
    OP
       2021-11-21 16:15:30 +08:00
    @O5oz6z3 #26 不是,当不存在较长的 io 等待的时候,协程和单线程没差。
    yufpga
        29
    yufpga  
       2021-11-21 20:01:53 +08:00
    大概看了下, 这瓶颈显然不是在 parse_text 中的文件读,就算再怎么阻塞,读写本地文件也不至于到每秒才 400 行的程度. 而在 write_db 中, 出现好几处 await 的地方, 这些地方可都是要同步等待结果返回的呀. 一个很好容易验证的方法就是把 write_db 中的 await 用 await asyncio.sleep 替换掉, 尝试不同的 sleep 时间. 实际上上面的问题在于每一次 while 1 的循环循环是同步的, 你必须要先处理完队列中的前一条数据, 才能继续处理下一条数据. 所以处理也很简单, 把每一次的循环异步化掉.
    hustlibraco
        30
    hustlibraco  
       2021-11-22 00:36:59 +08:00
    用```async for```替代```for```可以吗?
    firejoke
        31
    firejoke  
    OP
       2021-11-22 09:23:13 +08:00
    @hustlibraco #30 换成异步文件读,就可以换成 async for 了。
    firejoke
        32
    firejoke  
    OP
       2021-11-22 09:33:38 +08:00
    @yufpga #29 我看日志里,我同时开了好多个 task ,这个 task 的循环里 await query 或 add 或 commit ,就会跳到另一个 task 的循环里的 query 或 add 或 commit 。
    yufpga
        33
    yufpga  
       2021-11-22 10:17:06 +08:00
    @firejoke 是我看差了, 我以为只有一个 queue, 而你的代码里是两个 context, 各自 3 个 queue, 也就是总共 6 个 queue, 对应 6 个 write_db 的 task. 当遇到 await 的时候, 确实是会跳转到别的 task 里面执行. 确实比较奇怪,但我仍然觉得瓶颈不大可能在 parse_text, 你可以试着记录一下队列写入数据的速率, 如果这个速率也在 400/s 左右, 那说明确实有可能是 parse_text 慢了
    ohayoo
        34
    ohayoo  
       2021-11-22 23:33:57 +08:00
    @firejoke 老哥可以分享下多进程版本的代码吗?
    firejoke
        35
    firejoke  
    OP
       2021-11-23 22:00:11 +08:00
    @ohayoo #34 还在调试多进程和协程的组合,后面会贴一下的。
    mlbjay
        36
    mlbjay  
       2023-07-21 15:01:08 +08:00
    python 的 asyncio 是纯用户态线程,同步 io 会阻塞整个线程及其中的所有协程。
    Golang 的 MPG 模型就解决的这个问题。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2689 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 10:16 · PVG 18:16 · LAX 02:16 · JFK 05:16
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.