V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
23571113
V2EX  ›  C++

如果读文件的速度比处理的快怎么办

  •  
  •   23571113 · 2020-02-21 21:59:55 +08:00 · 6421 次点击
    这是一个创建于 1794 天前的主题,其中的信息可能已经有所发展或是发生改变。

    我要处理一个大文件, 用的是 libuv(reactor 模型)来处理异步 I/O,但是一个问题是,我非阻塞分段读了文件之后(文件很大),放到线程池中做一些修改,然后按顺序非阻塞写到另外一个文件里.
    问题一是 写文件是远程写,而读文件是本地读,这样非阻塞 I/O 会导致内存爆炸.
    问题二是 既然是非阻塞 I/O 我怎么,输出怎么拼回去.

    40 条回复    2020-03-02 11:29:17 +08:00
    learningman
        1
    learningman  
       2020-02-21 22:05:54 +08:00
    disk cache
    secondwtq
        2
    secondwtq  
       2020-02-22 01:42:56 +08:00   ❤️ 3
    隐约觉得看到过有资料提过类似的情形: https://zeux.io/2019/04/20/qgrep-internals 从“First, the read speed and search speed can be substantially unbalanced.”开始

    TLDR:作者实现了一个比 ag 和 ripgrep 等还快的代码检索工具(主要是加索引,ag 等一般是没索引的所以速度没法比),作者使用线程池来实现搜索,输入和输出都是单线程处理。在输入端用了一个队列,队列在数据超过一定量(也就是输入太快了)时会阻塞掉 push,在输出端使用了一个“ordered queue”,也就是 worker 线程在把结果交给输出线程时会附带位置顺序信息,输出线程只会在缓存里有下一块数据时才会写进去。
    catror
        3
    catror  
       2020-02-22 01:57:57 +08:00 via Android
    1. 无论怎么读,你自己都可以控制读取速度的
    2. 预分配文件,写的时候指定迁移即可,都有专门的函数
    catror
        4
    catror  
       2020-02-22 01:58:50 +08:00 via Android
    @catror 迁移 --> 偏移
    msg7086
        5
    msg7086  
       2020-02-22 02:05:00 +08:00
    分段读的时候检测一下内存占用,然后阻塞?
    laminux29
        6
    laminux29  
       2020-02-22 03:42:27 +08:00
    建议题主尝试先不编程,而是在 Visio 上把流程图画一画,跟着流程走一走。

    编程不仅是一门技术,更是一门艺术。
    而艺术在于设计。

    多设计,少 Coding,才是成为大神之道。
    lihongming
        7
    lihongming  
       2020-02-22 04:54:32 +08:00 via iPhone
    参考 TCP 协议模型
    FrankHB
        8
    FrankHB  
       2020-02-22 07:49:52 +08:00
    @laminux29 要先设计是没错,但设计跟编程不冲突。只要是自己的设计,本质上总是可编程的。你看来只是要强调先不去拿代码实现罢了;然而画图真的就是给没有编程能力的外行人展示用的低效方法。
    (严格意义上,图是可编程的,但拿 Visio 这种本来就不是为了这个目的设计的工具的方案,还不配实用。)
    xenme
        9
    xenme  
       2020-02-22 07:56:11 +08:00 via iPhone
    就是 buffer 的概念么

    读的时候放到你的临时文件,临时文件达到一定大小,相当于 buffer 满了,得阻塞读了
    boywhp
        10
    boywhp  
       2020-02-22 09:00:44 +08:00
    读得快写得慢 你不会暂停一会再读?
    Cbdy
        11
    Cbdy  
       2020-02-22 09:02:03 +08:00 via Android
    那你可以读慢一点
    rapiz
        12
    rapiz  
       2020-02-22 09:24:13 +08:00
    不就是生产者消费者模型吗
    reus
        13
    reus  
       2020-02-22 09:31:09 +08:00
    Michaelssss
        14
    Michaelssss  
       2020-02-22 09:44:00 +08:00 via Android
    你让我想起了 Linus 的读取 240GB/S 的硬盘😂过多占用内存总线
    lqf96
        15
    lqf96  
       2020-02-22 09:48:32 +08:00 via iPhone
    我怎么感觉这其实就是 backpressure 的问题…
    encro
        16
    encro  
       2020-02-22 10:00:51 +08:00
    @laminux29
    @FrankHB

    除了先画图,或者直接编程,有一种中间方法是先写注释,写注释过程过程理清楚,注释中说明为什么要这样做,然后写代码时填空即可,时间不多(必要),代码质量还高,且不容易出错。
    一般我都会采用这种方式,偶尔涉及到比如多个系统多次交互,可能需要先画图。
    abutter
        17
    abutter  
       2020-02-22 10:21:38 +08:00
    就是乱序然后重组,发给多个线程工作的时候要带标识,例如序号,文件偏移地址之类的唯一标识,处理完后的线程将数据交给一个线程,这个线程负责数组排序重组然后写入。
    abutter
        18
    abutter  
       2020-02-22 10:23:49 +08:00
    问题 1,可以通过线程池的线程的数量限制获得最大的的处理能力,然后读取要根据数据完成的情况进行,而不是只要可以读就读。
    问题 2,如上一个回答。
    23571113
        19
    23571113  
    OP
       2020-02-22 11:11:31 +08:00
    @catror 问题是文件修改过了,我没法知道偏移量了.
    23571113
        20
    23571113  
    OP
       2020-02-22 11:25:17 +08:00
    @secondwtq 大佬我觉得加同步队列的方法靠谱,毕竟我感觉写的慢的话,处理的再快也没用.
    LZ 是小白,导师也很久没写代码了,准确来说需要我做一个备份软件,存储接各大云服务商的存储服务(例如阿里云 OSS,AWS S3),然后能够做到把不同文件的相似数据块给去掉. 导师给了我一个月时间, 我现在的情况是相关论文基本都看完了, 语言会 C++和一些非常基础的库,golang 稍微入门. 第一版计划让我跑在两个节点上,一个做在线去重,一个做离线去重,请问来的及吗?
    23571113
        21
    23571113  
    OP
       2020-02-22 11:30:10 +08:00
    @secondwtq
    @catror
    @msg7086
    @laminux29
    @FrankHB
    @boywhp
    @Cbdy
    @rapiz
    @abutter
    对了,客户还要能够恢复数据. 我现在愁死了,不知道如何下手. 大佬们帮帮忙看看我该从哪里开始.
    abutter
        22
    abutter  
       2020-02-22 11:38:32 +08:00
    你的需求我是没有完全看懂,啥叫能够“然后能够做到把不同文件的相似数据块给去掉”?不重复传递一样的数据?

    云存储提供的 API 不同,可能实现方式不同。
    23571113
        23
    23571113  
    OP
       2020-02-22 11:43:00 +08:00
    @abutter 比如两个文件有 4KB 的数据是相同的,我只存 4KB 数据到云上,而不是存 8KB.
    abutter
        24
    abutter  
       2020-02-22 11:56:22 +08:00
    librsync 应该有你需要的,用来确定文件是否变化,以及哪些部分变化。

    还有一个问题,要看你是的云服务商提供的 API。否则需要加 meta 文件来解决。
    FrankHB
        25
    FrankHB  
       2020-02-22 13:07:48 +08:00
    Content-agnostic target deduplication... 没做过不清楚细节,之前找到过的也是文件系统级的。不过这种需求如果不是服务商给特别的支持,可以考虑用户侧自己把要传输的文件都压缩了,顺便也会 dedup。
    恢复数据是指什么?你这备份数据的时候不会把原件给扔了吧。如果是指传输中断可以恢复进度,做日志实现事务。
    laminux29
        26
    laminux29  
       2020-02-22 14:23:33 +08:00
    @FrankHB 做编程就像建房砌砖一样。小房子,不做设计,先砌砖当然可行。但大型工程,必然要先有设计图。
    Samuelcc
        27
    Samuelcc  
       2020-02-22 18:23:00 +08:00 via Android
    背压下
    Sasasu
        28
    Sasasu  
       2020-02-22 18:30:18 +08:00
    异步里很常见简单一个问题,除非你的程序是单纯的数据库 gui 的 api 都会有这个问题。

    常见的思路是把线程分两组,一组用于异步事件轮询,一组用于阻塞任务。无论是 CPU 密集还是 IO 密集都可以这样搞,IO 密集阻塞如果去不掉的话就开一堆线程去做,CPU 密集就少开点线程。
    比如
    http://docs.libuv.org/en/v1.x/threadpool.html This thread pool is internally used to run all file system operations, as well as getaddrinfo and getnameinfo requests.
    https://docs.rs/tokio-threadpool/0.1.18/tokio_threadpool/ Backup threads are intended only to support the blocking API.
    asio 应该可以兼容手写的 thread poll

    更 "工程" 的思路是实现抢占,比如 Go,只有一个 pool 然后一堆协程去抢,当然就会出现程序性能下限高上线极低。
    Mutoo
        29
    Mutoo  
       2020-02-22 18:38:52 +08:00
    生产者消费者 + 流模型,可以参考一下 node 的 steam 的实现,readable 和 writable 设计的非常好。
    对其设定缓冲区(buffer/highWaterMark)。之间用 pipe 串连起来,完全是异步实现。
    Sasasu
        30
    Sasasu  
       2020-02-22 18:41:23 +08:00
    > 然后按顺序非阻塞写到另外一个文件里.

    接受者一个持有一个当前 index,发送者把自己的 index - 1 一个 CAS 到接受者上
    失败就休眠,等发送者唤醒自己
    成功就返回成功,并唤醒发送者

    发送者写完一个块就唤醒等在自己身上的接受者
    Sasasu
        31
    Sasasu  
       2020-02-22 18:43:38 +08:00
    writer 一开始一个持有一个当前 index = 0

    worker 把自己的 index - 1 用 CAS 推到 writer 上
    > 失败就休眠,等 writer 唤醒自己
    > 成功就返回成功,并唤醒 writer

    writer 写完一个块就唤醒等在自己身上的 worker
    23571113
        32
    23571113  
    OP
       2020-02-22 19:54:56 +08:00
    @Sasasu 我觉得 writer 不应该和线程池之间共用 index 之类的概念,因为他们是不同的东西. 我觉得按顺序完成应该由线程池来保证的. 比如线程池内部有两个变量, 一个是递增的给任务分配的任务号, 一个用来判断当前可以执行结束的任务号.
    Sasasu
        33
    Sasasu  
       2020-02-22 21:06:23 +08:00
    原理都差不多,但是没见任何 runtime 有现成的功能....
    outoftimeerror
        34
    outoftimeerror  
       2020-02-22 22:01:43 +08:00   ❤️ 1
    reactive stream 了解一下,akka-stream 提供了实现方式
    DelayNoMore
        35
    DelayNoMore  
       2020-02-22 22:50:42 +08:00
    如果是 go,用 waitgroup 搞定
    reus
        36
    reus  
       2020-02-23 08:11:37 +08:00   ❤️ 1
    你们都是纸上谈兵……
    实际上一个 content addressable 系统,一百来行就能实现核心功能
    https://play.golang.org/p/LJWU2ut6_TH
    https://gist.github.com/reusee/fac296fd9b19d1b5a78650816518ebe2
    reus
        37
    reus  
       2020-02-23 12:21:30 +08:00
    这个实际上就是和 bt 类似的东西,将文件或者文件夹分成多个块,每个块计算哈希,然后种子文件里只记录哈希和文件夹结构,这样就能用种子去下载整个文件。存储就直接用 kv 类就行,键是哈希,值是哈希对应的内容,这样相同的内容自然就只存一份了。
    git 也是类似的思路,这些系统有个统称是 content addressable system。不难做的,用 go 吧。你看我花个十几二十分钟都做出个原型了,你们还在讨论 io 模型,讨论流程图……
    23571113
        38
    23571113  
    OP
       2020-02-23 12:35:33 +08:00
    @reus 基本是这样的,但是有些策略, 比如 chunk 划分不应该用定长的, 如果两份相同的数据, 把一份前面加一个字节, 定长的 chunk 就找不到重复的. (你可以搜一下 content defined chunking).
    reus
        39
    reus  
       2020-02-23 16:01:24 +08:00 via Android
    @23571113 学习了
    hardwork
        40
    hardwork  
       2020-03-02 11:29:17 +08:00
    大致的模型,主线程分段读,每读一段分发到 uv_queue_work 去处理,设置完成回调,回调中做发送处理。
    问题 1, 读处理要控制内存,可以简单控制一下任务数量,发出去一个任务加一,发送完成一个减一,达到上限就休息一下,更精细的就记录内存使用量。精确等待控制可以用 cond signal 来做。否则 sleep 一下也可以。
    问题 2, 顺序写回在回调里做,弄个数组存完成任务序号,回调里顺序检测数组从头开始发送,碰到还没完成的回调返回即可。记录一下上次顺序完成序号,下次检测从上次完成序号开始。

    跑回调的线程和发任务的线程是同一个,所以上面检测都是线程安全的。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   917 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 20:39 · PVG 04:39 · LAX 12:39 · JFK 15:39
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.