我要处理一个大文件, 用的是 libuv(reactor 模型)来处理异步 I/O,但是一个问题是,我非阻塞分段读了文件之后(文件很大),放到线程池中做一些修改,然后按顺序非阻塞写到另外一个文件里.
问题一是 写文件是远程写,而读文件是本地读,这样非阻塞 I/O 会导致内存爆炸.
问题二是 既然是非阻塞 I/O 我怎么,输出怎么拼回去.
1
learningman 2020-02-21 22:05:54 +08:00
disk cache
|
2
secondwtq 2020-02-22 01:42:56 +08:00 3
隐约觉得看到过有资料提过类似的情形: https://zeux.io/2019/04/20/qgrep-internals 从“First, the read speed and search speed can be substantially unbalanced.”开始
TLDR:作者实现了一个比 ag 和 ripgrep 等还快的代码检索工具(主要是加索引,ag 等一般是没索引的所以速度没法比),作者使用线程池来实现搜索,输入和输出都是单线程处理。在输入端用了一个队列,队列在数据超过一定量(也就是输入太快了)时会阻塞掉 push,在输出端使用了一个“ordered queue”,也就是 worker 线程在把结果交给输出线程时会附带位置顺序信息,输出线程只会在缓存里有下一块数据时才会写进去。 |
3
catror 2020-02-22 01:57:57 +08:00 via Android
1. 无论怎么读,你自己都可以控制读取速度的
2. 预分配文件,写的时候指定迁移即可,都有专门的函数 |
5
msg7086 2020-02-22 02:05:00 +08:00
分段读的时候检测一下内存占用,然后阻塞?
|
6
laminux29 2020-02-22 03:42:27 +08:00
建议题主尝试先不编程,而是在 Visio 上把流程图画一画,跟着流程走一走。
编程不仅是一门技术,更是一门艺术。 而艺术在于设计。 多设计,少 Coding,才是成为大神之道。 |
7
lihongming 2020-02-22 04:54:32 +08:00 via iPhone
参考 TCP 协议模型
|
8
FrankHB 2020-02-22 07:49:52 +08:00
@laminux29 要先设计是没错,但设计跟编程不冲突。只要是自己的设计,本质上总是可编程的。你看来只是要强调先不去拿代码实现罢了;然而画图真的就是给没有编程能力的外行人展示用的低效方法。
(严格意义上,图是可编程的,但拿 Visio 这种本来就不是为了这个目的设计的工具的方案,还不配实用。) |
9
xenme 2020-02-22 07:56:11 +08:00 via iPhone
就是 buffer 的概念么
读的时候放到你的临时文件,临时文件达到一定大小,相当于 buffer 满了,得阻塞读了 |
10
boywhp 2020-02-22 09:00:44 +08:00
读得快写得慢 你不会暂停一会再读?
|
11
Cbdy 2020-02-22 09:02:03 +08:00 via Android
那你可以读慢一点
|
12
rapiz 2020-02-22 09:24:13 +08:00
不就是生产者消费者模型吗
|
13
reus 2020-02-22 09:31:09 +08:00
|
14
Michaelssss 2020-02-22 09:44:00 +08:00 via Android
你让我想起了 Linus 的读取 240GB/S 的硬盘😂过多占用内存总线
|
15
lqf96 2020-02-22 09:48:32 +08:00 via iPhone
我怎么感觉这其实就是 backpressure 的问题…
|
16
encro 2020-02-22 10:00:51 +08:00
|
17
abutter 2020-02-22 10:21:38 +08:00
就是乱序然后重组,发给多个线程工作的时候要带标识,例如序号,文件偏移地址之类的唯一标识,处理完后的线程将数据交给一个线程,这个线程负责数组排序重组然后写入。
|
18
abutter 2020-02-22 10:23:49 +08:00
问题 1,可以通过线程池的线程的数量限制获得最大的的处理能力,然后读取要根据数据完成的情况进行,而不是只要可以读就读。
问题 2,如上一个回答。 |
20
23571113 OP @secondwtq 大佬我觉得加同步队列的方法靠谱,毕竟我感觉写的慢的话,处理的再快也没用.
LZ 是小白,导师也很久没写代码了,准确来说需要我做一个备份软件,存储接各大云服务商的存储服务(例如阿里云 OSS,AWS S3),然后能够做到把不同文件的相似数据块给去掉. 导师给了我一个月时间, 我现在的情况是相关论文基本都看完了, 语言会 C++和一些非常基础的库,golang 稍微入门. 第一版计划让我跑在两个节点上,一个做在线去重,一个做离线去重,请问来的及吗? |
21
23571113 OP |
22
abutter 2020-02-22 11:38:32 +08:00
你的需求我是没有完全看懂,啥叫能够“然后能够做到把不同文件的相似数据块给去掉”?不重复传递一样的数据?
云存储提供的 API 不同,可能实现方式不同。 |
24
abutter 2020-02-22 11:56:22 +08:00
librsync 应该有你需要的,用来确定文件是否变化,以及哪些部分变化。
还有一个问题,要看你是的云服务商提供的 API。否则需要加 meta 文件来解决。 |
25
FrankHB 2020-02-22 13:07:48 +08:00
Content-agnostic target deduplication... 没做过不清楚细节,之前找到过的也是文件系统级的。不过这种需求如果不是服务商给特别的支持,可以考虑用户侧自己把要传输的文件都压缩了,顺便也会 dedup。
恢复数据是指什么?你这备份数据的时候不会把原件给扔了吧。如果是指传输中断可以恢复进度,做日志实现事务。 |
27
Samuelcc 2020-02-22 18:23:00 +08:00 via Android
背压下
|
28
Sasasu 2020-02-22 18:30:18 +08:00
异步里很常见简单一个问题,除非你的程序是单纯的数据库 gui 的 api 都会有这个问题。
常见的思路是把线程分两组,一组用于异步事件轮询,一组用于阻塞任务。无论是 CPU 密集还是 IO 密集都可以这样搞,IO 密集阻塞如果去不掉的话就开一堆线程去做,CPU 密集就少开点线程。 比如 http://docs.libuv.org/en/v1.x/threadpool.html This thread pool is internally used to run all file system operations, as well as getaddrinfo and getnameinfo requests. https://docs.rs/tokio-threadpool/0.1.18/tokio_threadpool/ Backup threads are intended only to support the blocking API. asio 应该可以兼容手写的 thread poll 更 "工程" 的思路是实现抢占,比如 Go,只有一个 pool 然后一堆协程去抢,当然就会出现程序性能下限高上线极低。 |
29
Mutoo 2020-02-22 18:38:52 +08:00
生产者消费者 + 流模型,可以参考一下 node 的 steam 的实现,readable 和 writable 设计的非常好。
对其设定缓冲区(buffer/highWaterMark)。之间用 pipe 串连起来,完全是异步实现。 |
30
Sasasu 2020-02-22 18:41:23 +08:00
> 然后按顺序非阻塞写到另外一个文件里.
接受者一个持有一个当前 index,发送者把自己的 index - 1 一个 CAS 到接受者上 失败就休眠,等发送者唤醒自己 成功就返回成功,并唤醒发送者 发送者写完一个块就唤醒等在自己身上的接受者 |
31
Sasasu 2020-02-22 18:43:38 +08:00
writer 一开始一个持有一个当前 index = 0
worker 把自己的 index - 1 用 CAS 推到 writer 上 > 失败就休眠,等 writer 唤醒自己 > 成功就返回成功,并唤醒 writer writer 写完一个块就唤醒等在自己身上的 worker |
32
23571113 OP @Sasasu 我觉得 writer 不应该和线程池之间共用 index 之类的概念,因为他们是不同的东西. 我觉得按顺序完成应该由线程池来保证的. 比如线程池内部有两个变量, 一个是递增的给任务分配的任务号, 一个用来判断当前可以执行结束的任务号.
|
33
Sasasu 2020-02-22 21:06:23 +08:00
原理都差不多,但是没见任何 runtime 有现成的功能....
|
34
outoftimeerror 2020-02-22 22:01:43 +08:00 1
reactive stream 了解一下,akka-stream 提供了实现方式
|
35
DelayNoMore 2020-02-22 22:50:42 +08:00
如果是 go,用 waitgroup 搞定
|
36
reus 2020-02-23 08:11:37 +08:00 1
你们都是纸上谈兵……
实际上一个 content addressable 系统,一百来行就能实现核心功能 https://play.golang.org/p/LJWU2ut6_TH https://gist.github.com/reusee/fac296fd9b19d1b5a78650816518ebe2 |
37
reus 2020-02-23 12:21:30 +08:00
这个实际上就是和 bt 类似的东西,将文件或者文件夹分成多个块,每个块计算哈希,然后种子文件里只记录哈希和文件夹结构,这样就能用种子去下载整个文件。存储就直接用 kv 类就行,键是哈希,值是哈希对应的内容,这样相同的内容自然就只存一份了。
git 也是类似的思路,这些系统有个统称是 content addressable system。不难做的,用 go 吧。你看我花个十几二十分钟都做出个原型了,你们还在讨论 io 模型,讨论流程图…… |
38
23571113 OP @reus 基本是这样的,但是有些策略, 比如 chunk 划分不应该用定长的, 如果两份相同的数据, 把一份前面加一个字节, 定长的 chunk 就找不到重复的. (你可以搜一下 content defined chunking).
|
40
hardwork 2020-03-02 11:29:17 +08:00
大致的模型,主线程分段读,每读一段分发到 uv_queue_work 去处理,设置完成回调,回调中做发送处理。
问题 1, 读处理要控制内存,可以简单控制一下任务数量,发出去一个任务加一,发送完成一个减一,达到上限就休息一下,更精细的就记录内存使用量。精确等待控制可以用 cond signal 来做。否则 sleep 一下也可以。 问题 2, 顺序写回在回调里做,弄个数组存完成任务序号,回调里顺序检测数组从头开始发送,碰到还没完成的回调返回即可。记录一下上次顺序完成序号,下次检测从上次完成序号开始。 跑回调的线程和发任务的线程是同一个,所以上面检测都是线程安全的。 |