V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
drymonfidelia
V2EX  ›  程序员

要对单个 6.20TB 的超大 csv 文件保持顺序的情况下进行去除重复行,有什么好思路?显然不可能加载进内存

  •  
  •   drymonfidelia · 172 天前 · 11852 次点击
    这是一个创建于 172 天前的主题,其中的信息可能已经有所发展或是发生改变。
    101 条回复    2024-06-08 02:13:43 +08:00
    1  2  
    NXzCH8fP20468ML5
        1
    NXzCH8fP20468ML5  
       172 天前 via Android
    duckdb 值得拥有
    dcsuibian
        2
    dcsuibian  
       172 天前
    扔数据库不行吗?
    opengps
        3
    opengps  
       172 天前
    能想到的只有数据库
    buaasoftdavid
        4
    buaasoftdavid  
       172 天前
    内存里搞个哈希表,一行一行读 csv ,哈希表碰撞了就扔掉该行,没碰撞就插入哈希表再写到磁盘
    52boobs
        5
    52boobs  
       172 天前 via Android
    表的结构是怎样的,有天然的主键吗
    kneo
        6
    kneo  
       172 天前 via Android   ❤️ 1
    行数是多少?平均行长是多少?
    去重是应该基于整行文本还是列内容?比如 1.0 和 1 是否应该算做重复?
    每行前缀重复度是否够高?是否有某列( XXID )可以用于快速去重?
    机器性能如何?内存有多大?
    securityCoding
        7
    securityCoding  
       172 天前 via Android
    spark 干的活?
    drymonfidelia
        8
    drymonfidelia  
    OP
       172 天前
    @kneo 行数是 203 亿,平均行长 335
    去重是基于整行文本
    前缀重复度不高,没有 ID
    最高可以弄到 256GB 内存的服务器
    phrack
        9
    phrack  
       172 天前 via iPhone
    光就这点信息说个屁呢,一行 8 个字符,是几千亿行,一行 1M 字符,是几百万行,这能一样吗?

    内存也不说,4KB 内存和 4GB 内存能一样吗?
    drymonfidelia
        10
    drymonfidelia  
    OP
       172 天前
    @phrack 8 楼补充了
    rrfeng
        11
    rrfeng  
       172 天前
    去重后大概会有多少行知道吗
    lifanxi
        12
    lifanxi  
       172 天前   ❤️ 2
    遍历整个文件,每行都 hash 一下,把 hash 存到一个高性能 KV 里,不重复就输出当前行,重复就跳过当前行。
    drymonfidelia
        13
    drymonfidelia  
    OP
       172 天前
    @rrfeng 不知道
    cndenis
        14
    cndenis  
       172 天前
    如果不是要求严格不能丢数据的话, 可以用布隆过滤器去重, 误判率有公式可以算的, 有几十 GB 级别内存的话, 误判率应该比较低的
    phrack
        15
    phrack  
       172 天前 via iPhone
    开启 zram ,256g 可以当作 512g 没问题。

    sha1sum 一个占用 20 字节,200 亿差不多占用 372g ,没问题。

    极低概率去掉非重复行,几乎可以忽略。
    rrfeng
        16
    rrfeng  
       172 天前
    那就 bloomfilter 先过一遍看看情况。

    要硬算的话,就分块排序,排完序就好处理了。排序前记录下序号,最后还原一下顺序。
    hbcolorful
        17
    hbcolorful  
       172 天前
    redis 的布隆过滤器可以考虑下
    NXzCH8fP20468ML5
        18
    NXzCH8fP20468ML5  
       172 天前 via Android
    @drymonfidelia 看错了,还以为是 6GB 的 csv 文件在线处理呢,那确实不适合 duckdb 。

    还是上 spark 吧,硬盘配大点就行。

    203 亿行 csv 有那么大吗,我们每天备份全量的 17 亿行信息,保留几十天,用 orc 存储,也就几百 G 。
    kneo
        19
    kneo  
       172 天前 via Android
    感觉可以试试 clickhouse 。
    yinmin
        20
    yinmin  
       172 天前 via iPhone
    使用 apache spark ,用 python 的 PySpark 库试试,具体可以问 gpt-4
    NXzCH8fP20468ML5
        21
    NXzCH8fP20468ML5  
       172 天前 via Android   ❤️ 1
    1. hash
    2. 加序号
    3. 按照 hash 分区
    4. 逐个处理分区
    5. 分区内排序
    6. 分区外归并排序

    只有单机的话,可以考虑用 duckdb ,多机就用 spark 吧。
    yangxin0
        22
    yangxin0  
       172 天前
    分治:
    1 、用空间换时间(计算)
    2 、用时间(计算)换空间

    针对( 1 )有 spark 集群很快的,如果预算有限那么方法( 2 ):
    1 、把数据分成 N 块,并针对 N 块内进行去重
    2 、从 n 块中取一块,和剩下的 n-1 块去重,取这一块建立 hash or map 都可以,n-1 按照顺序读取
    3 、从剩下的 n-1 块中又进行步骤( 2 ), 直到 n=0
    4 、经过上述思路处理的 csv 就包含重复
    caola
        23
    caola  
       172 天前
    直接存入 kvrocks (硬盘版 redis)
    dacapoday
        24
    dacapoday  
       172 天前
    单文件这么大,文件系统压力也不小吧。多数文件系统单文件也不支持这么大吧
    james122333
        25
    james122333  
       172 天前 via Android
    sed 有往下查找一样内容行并删除的工具都可以 其它的都要内存或硬盘空间 vim 就差在它开启文件要暂存 不然也可以
    YTMartian
        26
    YTMartian  
       172 天前
    磁盘够用的话,先外部排序,然后直接读取,忽略与上一条相同的数据就行了吧,随机读取文件指定位置,也不用加载进内存
    dode
        27
    dode  
       172 天前
    按顺序处理,依据一个合适长度的前缀做分区,逐行文本进行处理,写入到对应分区下面。

    检索特定行文本,是否在对应分区内存在,不存在则写入,存在就返回已存在。
    chen7897499
        28
    chen7897499  
       172 天前 via Android
    emeditor
    NotLongNil
        30
    NotLongNil  
       172 天前
    上面有人提到的 Bloom Filter 应该是相对最优的解法了,实现简单,占用内存低,速度也快。唯一的问题就是要选择合适的长度,将错误率降低,这需要一定的算法知识,不过现在可以问 AI 了,让 AI 给出公式
    ClericPy
        31
    ClericPy  
       172 天前
    有点像 map reduce 场景,归并加快排

    问 AI 是好办法
    msg7086
        32
    msg7086  
       172 天前   ❤️ 2
    我能想到的两种不同的做法。
    第一种,在内存不足的情况下,放弃掉内存,直接用 SSD 读写。
    在 SSD 上开一个数据库(比如 MySQL 或者 Postgres ),把已经存在的 hash 写到数据库里。
    然后流式扫描每一行,取 hash 比对数据库,如果存在 hash 就跳过,不存在就写到结果集里并添加到数据库。
    要快速稳妥可以用两种不同的 hash ,比如 xxHash 做一次过滤,SHA1 做二次检验。

    第二种,在内存不足的情况下,分批处理。
    多次流式扫描每一行,取 hash ,每次只处理 hash 第一个 hex 字符相同的那些数据。
    第一次只索引和处理 sha1hash[0] == '0',第二次只索引和处理'1',这样可以把内存需求降到 1/16 ,缺点是 hash 计算也会是 16 倍。
    稍微优化一下的话,可以在第一次遍历的时候在数据上追加 sha1hash[0]作为分区标记,这样后面 15 次就不会重复计算,缺点是会每行多一两个字节,而且要多写入一次磁盘。
    esee
        33
    esee  
       172 天前 via Android
    什么叫保级顺序?比如在一百万的位置和 200 万的位置有重复项,则删除后面重复的那个是么。然后能提供多大的内存和硬盘,
    wxf666
        34
    wxf666  
       172 天前
    想到个方法,预计耗时:10 小时,准确率:100% 剔除重复行。


    ## 简单流程

    1. 分块排序。
    2. 同时循环每块,删掉非首次出现的重复行。
    3. 分别循环每块,按行号顺序,输出未被删掉的行。


    ## 详细流程

    1. 分块 240GB 文件,每块排序后,写入固态。同时保存每行长度+原始偏移量(约 (240 << 30) / 335 * 8 / 1024 ^ 3 = 5.7 GB )。
    2. 利用小根堆,流式排序(按 <string, offset> 排)所有分块每一行。非首次出现行,保存该行偏移量,到相应块的删除名单里。
    3. 循环每块,排序原始偏移量、删除名单,按序输出(未被删除的)相应行,至最终文件。


    ## 耗时计算

    1. 顺序读写:9 小时( 3 次顺序读,2 次顺序写,假设都为 1GB/s )。
    2. 内存字符串排序:< 1 小时(实测轻薄本 i5-8250U ,每秒归并排序 200W 次 335 长度的随机字符串,约 6900W 次比较)。
    - 多线程快排/归并:`(每块行数 = (240 << 30) / 335) * log2(每块行数) * 块数 = 6017 亿` 次比较,我的轻薄本 8 线程需 0.3 小时。
    - 单线程小根堆:`202e8 * log2(块数 = 6.2 * 1024 / 240 = 26.5) * 2 = 1910 亿` 次比较,需 0.7 小时。
    wxf666
        35
    wxf666  
       172 天前   ❤️ 1
    34 楼纠正下数据,实测轻薄本 i5-8250U ,1.5 秒归并排序 320W 个 336 长度的随机字符串,约 6500W 次比较。

    - 多线程快排/归并:总计 6017 亿次比较,我的轻薄本 8 线程需 0.5 小时。
    - 单线程小根堆:总计 1910 亿次比较,单线程需 1.2 小时。

    差不太远。。
    wxf666
        36
    wxf666  
       172 天前
    @dcsuibian #2 ,@opengps #3 ,@msg7086 #32:
    如果数据库,每秒写入 10W 条,总计要 203e8 / 1e5 / 3600 = 56 小时?

    @YTMartian #26 ,@dode #27:
    就算固态 4K 随机读写有 10W IOPS ,算下来也要 56 小时吧?

    wxf666
        37
    wxf666  
       172 天前
    @cndenis #14 ,@hbcolorful #17 ,@NotLongNil #30:

    用布隆过滤,几十 GB 好像不够。
    在线算了下,50 GB + 15 函数,都会有 1 / 25000 概率出错。。
    250 GB + 11 函数,算完 203 亿行,才能有 83.8% 的概率,一个不出错?


    @phrack #15:

    压缩内存,来存 hash ?好像真的可行。。
    平均而言,写入 (372 << 30) / 4096 = 1 亿次,就会占满 372 GB 内存页。即,几乎一开始就会启用 zram ?
    我在别处看了看,lz4 每秒能有 200W 次 IO ,算下来要 2.8 小时即可?
    话说,这个和 Bloom Filter 相比,哪个出错概率小呢?


    dingwen07
        38
    dingwen07  
       172 天前
    @wxf666
    Bloom filter 的假阳性率是要看哈希函数的数量的吧
    hobochen
        39
    hobochen  
       172 天前
    大概想了一下,一定深度的前缀树,叶子节点是哈希表或者平衡树存原来的行号应该是一个可行的方案。
    hobochen
        40
    hobochen  
       172 天前
    更正:叶子节点应当是一个哈希表/平衡树; k 是哈希值,v 是行号
    noqwerty
        41
    noqwerty  
       172 天前 via iPhone
    可以试试 polars streaming + file sink ? https://www.rhosignal.com/posts/streaming-in-polars/
    dode
        42
    dode  
       172 天前
    @wxf666 这个是连续 IO 的读写很快,文件在 Linux 系统中读写也可以开启内存缓存
    dode
        43
    dode  
       172 天前
    @wxf666 固态硬盘随机读写 10 IOPS ,服务器也可以搞几块 U3 固态,存这个计算缓存加速呀
    tonywangcn
        44
    tonywangcn  
       172 天前
    @wxf666

    你的计算好像不对吧

    n = 20300000000
    p = 0.000000001 (1 in 999925223)
    m = 875595082773 (101.93GiB)
    k = 30

    https://hur.st/bloomfilter/?n=20300000000&p=1.0E-9&m=&k=
    acapla
        45
    acapla  
       172 天前
    不限时间的话:

    for (i = 1; i < N; i++)
    for (j = 0; j < i; j++)
    if line[i] == line[j]:
    skip line[i]
    HanashirodotETH
        46
    HanashirodotETH  
       172 天前
    分块 - 用 96 位哈希编号,去重,排序 - 多路归并
    但是也要大概 240G 内存。
    wxf666
        47
    wxf666  
       172 天前
    @dingwen07 #38 是的。37 楼有提及到,用多少哈希函数。


    @dode #42

    《写入到对应分区下面》这个是缓存尽可能多的文本(如 1GB ),再写入,是吗?
    《检索特定行文本,是否在对应分区内存在》这个是如何做到,顺序读的呢?


    @tonywangcn #44

    平均每十亿条,就误认为一次,某行是重复行,导致丢失该行?
    那你要问 @drymonfidelia 愿不愿意丢失几十行数据了。。
    sunnysab
        48
    sunnysab  
       172 天前
    @phrack sha1sum 本身如何存储或索引呢?使用 BTree (或类似的树)吗,还是多级 hash 索引?
    wxf666
        49
    wxf666  
       172 天前
    36 37 楼,好像没 @ 成功。。再试一下。。


    @opengps #3
    @msg7086 #32

    如果数据库,每秒写入 10W 条,总计要 203e8 / 1e5 / 3600 = 56 小时?


    @hbcolorful #17
    @NotLongNil #30

    用布隆过滤,几十 GB 好像不够。
    在线算了下,50 GiB + 15 函数,都会有 1 / 26000 概率出错,大约丢失 80W 行数据?
    250 GiB + 11 函数,算完 203 亿行,才能有 83.8% 的概率,不丢失任何数据,也不保留任何重复行?


    hguangzhen
        50
    hguangzhen  
       172 天前
    惊了~ 竟然没人提到 RocksDB 吗? 本地的文件型 KV 存储库,内置 bloomfilter ,磁盘空间够用,应该很简单的
    mayli
        51
    mayli  
       172 天前
    如果你只是想最简单的解法(不考虑最高效率或者多机并发)可以试试 sort+uniq

    sort 是可以排序比内存大的文件的: https://vkundeti.blogspot.com/2008/03/tech-algorithmic-details-of-unix-sort.html
    然后排序后的 uniq 是不怎么吃内存

    不过我看有个需求是要保持文件顺序的话,你可以用 uniq --repeated 来找到重复行,如果你重复行不多,那搞个脚本直接过滤一遍源文件就好,也是线性的。
    Kaiv2
        52
    Kaiv2  
       172 天前
    1. 先计原始文件 a.txt 算每一行 hash 保存到 hash.txt 文件
    2. 复制一份 hash.txt -> hash-2.txt 用于去重计算
    3. 取 hash-2.txt 文件中 10000(这个数根据内存大小预估) 个 hash 前 8 位不重复 hash_array_8
    4. 重复的的写入 hash-4.txt, 剩于的写入 hash-2.1.txt -> hash-2.txt , 循环处理直到 hash-2.txt 没有记录
    ```txt
    let limit = 10000; // 控制内存使用
    let hash_array_8 = [];
    let cache_line = []
    for(let h_line: read_line(hash_2.txt)) {
    if(hash_array_8.size < limit) {
    if(!hash_array_8.has(h_line.sub(8))) {
    hash_array_8.add(h_line.sub(8))
    }
    }
    if(hash_array_8.has(h_line.sub(8))) {
    if(cache_line.has(h_line)) {
    write(hash-4.txt);
    } else {
    cache_line.add(h_line);
    }
    } else {
    write(hash-2.1.txt);
    }
    }
    mv(hash-2.1.txt, hash-2.txt)
    ```
    5. 得到 hash.txt 跟文件一一对应,hash-4.txt 是重复的记录
    6. hash-4.txt (如果重复的不多)直接读取到内存,对应读取 a.txt, hash.txt 每一行,比较 hash 重复跳过,不重复写入 b.txt
    没有考虑过计算量,内存不够可以考虑试试这个办法
    Kaiv2
        53
    Kaiv2  
       172 天前
    @Kaiv2 写着写着写成了单机的,这么做多此一举,太蠢了。。。应该是 分 hash-3.1 .. n.txt 多个机器同时处理,然后合并重复数据 hash-4.1..n.txt
    drymonfidelia
        54
    drymonfidelia  
    OP
       172 天前
    @wxf666 不能丢失数据
    @esee 是的。最大可以提供 256GB 内存,硬盘没有限制
    msg7086
        55
    msg7086  
       172 天前
    @wxf666 #35 上 TB 的数据怎么处理都是会很慢的。(一秒 10w 条数据可能到不了)

    我建议用第三方数据库纯粹是因为这样对实现的要求最低,不需要你搞大内存服务器,不需要自己开发复杂的算法,全部用已知的成熟的方案,你只要插上一堆 SSD 然后干别的事就行了,等个几天数据就都跑完了。算法简单所以要根据需求修改起来也简单,可维护性也好。(用人话说就是,工程师不需要加班,让服务器加班就行。)

    现实当中从 SSD 读取数据到内存也是要花时间的,这么大的量级还要跑前后依赖的操作,我是觉得快不起来。

    (如果能并行 map reduce 倒是能快不少,但这里不太行。)
    lmshl
        56
    lmshl  
       172 天前   ❤️ 1
    版本答案:RocksDB (与其他 leveldb family 产品)

    解析:203 亿个 bloomfilter 在 p=0.01 下所需的内存空间约为 23.75GB 。实际上,去重所需的空间会少于 203 亿,所以在这个内存空间下,实际 p 值将进一步降低。

    大部分人可能对 bloomfilter 的使用存在误解,他们只考虑在只有 bloomfilter 单一算法存在的前提下来解决需求,这显然是错误的。现代数据库对 bloomfilter 的应用主要是用来降低 miss key 对磁盘 IO 的影响。如果 bloomfilter 认为这个 key 没有出现过,那么这个 key 确实没有出现过。当 bloomfilter 认为它可能出现过,那么出现的概率为 1-p ,此时需要回表二次确认(磁盘 IO )。

    假设一个典型的重复度为 10 倍的 200 亿数据表文件,在这个空间下,p 值会低至 1e-20 。

    那么对这个文件去重,总共会发生 200 亿次内存 bloomfilter 读取,20 亿次 bloomfilter 写入+磁盘顺序写入,以及 180 亿次磁盘随机读取。(考虑到数据库对磁盘的批量写入优化,sstable/memtable 这个数值将会被巨幅降低)

    假设一个重复度为 0.1 倍的 200 亿数据表文件,在这个空间下,p 值变化不大。

    那么对这个文件去重,总共发生 200 亿次内存 bloomfilter 读取,180 亿次 bloomfilter 写入+磁盘顺序写入,以及 20 亿次磁盘随机读取。(同上)

    根据网上其他人做的吞吐量测试,rocksdb 在现代硬件条件下可以稳定达到 10k*rows/s 以上的写入性能,或>1GB/s 的写入吞吐量。乐观地估计,6.2TB 的文件应该能在 2 小时到 2 天左右完成去重。
    cabbage
        57
    cabbage  
       172 天前 via Android
    @wxf666 布隆过滤器单独用确实不行,免不了假阳性,即便事后检查那也逃不了随机读,不是稳定的方法。但如果纯粹作为首次顺序读的过滤器来用,应该还不错,可以降低一些输入数据量。

    to 34 楼:其实没看懂第二步的堆排序怎么回事,是说在一次排序中对所有行进行排序并去重吗?如果这样的话,那比较的时候是不是还需要在内存里保留所有原始 string ?来讨教下,乐意的话可否点明一二?
    harmless
        58
    harmless  
       172 天前 via iPhone
    203 亿行全部计算成 hash ,再加上行号,大概 700 多 G
    1. 遍历每行,计算出 hash ,按 hash 第一位将 hash 和行号写入不同的文件,例如 hash 第一位为 0 ,则写入 hash_0.txt ,这样一共会有 16 个文件,每个文件大概 40 多 G
    2. 分别对每个文件按 hash 排序,找出重复的需要删除的行号,记录到文件中
    3. 遍历原始文件,删除需要删除的行
    lrjia
        59
    lrjia  
       172 天前
    先 hash ,按照 hash 前缀分块成多个文件,使分块后单块的大小可以放入内存。再对每块使用 hash 表去重。最后合并多个文件,用归并排序的做法。这中间应该都是文件的顺序读写。
    cloudzhou
        60
    cloudzhou  
       172 天前   ❤️ 1
    布隆过滤器,申请硬盘 db 空间作为布隆过滤器存储,按位标记,只要空间足够大,冲突就很小
    在布隆过滤器冲突情况下,将冲突部分,存入到其他人提到的某种 kv db ,然后排除重复处理
    wxf666
        61
    wxf666  
       172 天前   ❤️ 1
    @phrack #15

    突然想起来,zram 可能行不通。。

    sha1 的结果,是不是相当于随机数据?随机数据,压缩不了啥吧。。

    即,256 GB 内存,当不了 512 GB 用?全都用来存压缩比 100% 后的数据了?


    @cabbage #57

    需要保留 27 行原始 string ,在小根堆里。

    第二步目的:检查出各块中,偏移量非最小的重复行,记录进删除名单中。

    第二步时,已经有 26.5 个 240GB 的、排序好的块。

    参考多路归并,可以流式构造出有序的 (string, offset, chunk_index)。

    当 string 与上一个不同时,说明碰到偏移量最小的新行了(即,全文首次出现)。

    当 string 与上一个相同时,说明重复行了,此时往 "to_del_${chunk_index}.txt" 里记录 offset 。

    (可以攒多点再写,反正只用了 27 个字符串 + 27 * 1GB 缓冲区,还剩 200+GB 内存呢。。)

    以前写过类似的,10+ MB 内存去重 13 GB 文件,里面也有用到多路归并:/t/1031275#reply15
    conan257
        62
    conan257  
       172 天前
    学习下
    haha1903
        63
    haha1903  
       171 天前
    hyperloglog
    peterxulove
        64
    peterxulove  
       171 天前
    对每一行进行哈希,哈希后的哈希值每个位置的值为一个叶子结点建立二叉树,哈希值的位数就是二叉树的层级,判断的时候遍历二叉树即可。
    XuYijie
        65
    XuYijie  
       171 天前 via Android
    把数据读到数据库,然后分批处理,或者使用 easyExcel 分批读取,处理后把处理后的数据导出到一个新的 csv
    winiex
        66
    winiex  
       171 天前
    以行顺序读内容,记录一个每行的 hash 值,发生碰撞抛弃当前行
    IwfWcf
        67
    IwfWcf  
       171 天前
    如果只是要去重那分块排序就好了,如果要保留原有顺序的话那前面有人提到的 bloom filter 命中的情况下再去数据库查询确认应该是最优方案了
    qdwang
        68
    qdwang  
       171 天前
    203 亿行,每一行做 BLAKE3 ,按照 bit 为 1 的和,来进行分类,共分 256 类。

    平均每类大小 20300000000 * 256 / 8 / 1024 / 1024 / 1024 / 256 = 2.36G

    存成 256 个分类文件,根据你可用内存大小,载入对应数量的分类在内存里作为 cache 。未命中,就随机内存里去除一个,替换成新的。也很容易做成分布式,每台机器管理一定数量的分类。

    由于 BLAKE3 目标是 128bits 安全,所以你可以缩小到 128bit 使用,256g 内存电脑可以装得下 210 个分类在 cache 里。
    qdwang
        69
    qdwang  
       171 天前
    说错了,BLAKE3 使用 128bit 后,每个分类仍然是 2.36g ,但是分类数量减小到 128 个。
    vivisidea
        70
    vivisidea  
       171 天前
    1. 计算每一行的 hash , 保存到文件里 hashes.txt
    2. 对 hashes.txt 进行外部排序,外部排序对内存要求低,压力给到磁盘
    3. 遍历 hashes.txt 找出重复,因为已经有序,所以简单对比当前行和上一行即可知道是否重复
    Hawthorne
        71
    Hawthorne  
       171 天前
    不能用内存映射吗?
    psyer
        72
    psyer  
       171 天前 via Android
    @XuYijie pandas 的 easyExcle?会不会非常慢…
    psyer
        73
    psyer  
       171 天前 via Android
    pyspark 效果如何?😁
    qinrui
        74
    qinrui  
       171 天前
    emeditor 直接打开处理
    cocalrush
        75
    cocalrush  
       171 天前
    直接用阿里云的 odps 生成个离线表跑下是不是就行了...
    Keuin
        76
    Keuin  
       171 天前   ❤️ 1
    awk 每行尾追加逗号和行号,整个文件每个行都追加一下,占 6.2T
    unix sort 工具外排序,直接按字母表排序,占 6.2T 。重复行会变成相邻的,编号不一样。输出另占 6.2T
    用 awk 配合 uniq ,去重,全内存 O(1)空间算法,输出占 6.2T ,即为最终结果

    中间文件可以在不用的时候删掉,最大同时出现 2 份,也就是需要额外 2*6.2T 磁盘空间,由于都是流式算法,内存用量为很小的常数
    Keuin
        77
    Keuin  
       171 天前
    @Keuin 最后还差一步,按行号升序排序,重新排序回原来的顺序,最大额外磁盘空间不变
    qbmiller
        78
    qbmiller  
       171 天前
    再来一个文件 csvA. 然后这 2 文件 ,双指针遍历. A 放不重复的,
    基于顺序的话,是可以的
    blankmiss
        79
    blankmiss  
       171 天前
    或者你给个脱敏德样本数据出来看
    cndenis
        80
    cndenis  
       171 天前
    @wxf666 如果需要严格不能丢数据的话, 不能单用布隆过滤器.

    假设重复率比较低的的话,, 可以做两轮读取

    第一轮边读边构造布隆过滤器, 把发现的冲突的行记录到数据库

    第二轮先把数据库中值导入新的布隆过滤器, 然后用它来过滤原表, 对有冲突的行查用数据库确证没重复再输出
    Dream95
        81
    Dream95  
       171 天前
    为啥关注点都在去重上面,就按照 MapReduce 的思路归并排序,再去重不就很好
    zhuangzhuang1988
        82
    zhuangzhuang1988  
       171 天前
    直接 sqlite 试试

    ```python
    import sqlite3

    with sqlite3.connect("_temp.db") as conn:
    c = conn.cursor()
    c.execute(
    """
    CREATE TABLE kv (
    line TEXT UNIQUE
    );"""
    )
    with open("xxx.csv", "r", encoding="utf8") as fin:
    with open("xxx1.csv", "w", encoding="utf8") as fout:
    for line in fin:
    line = line.strip()
    if line:
    r = c.execute(
    "insert into kv(line) select :line where :line not in (select line from kv)",
    {"line": line},
    )
    if r.rowcount > 0:
    fout.write(line + "\n")
    ```
    ltux
        83
    ltux  
       171 天前
    不能全部加载到内存,那没法用哈希表去重。
    简单地归并排序,再顺序读取去重就完了。归并排序是稳定排序,可以保持行原来的顺序,也适合用于对超出内存限制的数据排序。
    结果就是 gnu sort 就完了-_-,加 --unique 选项一步到位。

    sort --stable --unique --output=OUTPUT.csv INPUT.csv
    根据你 cpu 核心数量 N 加个 --parallel=N 选项就完
    lsk569937453
        84
    lsk569937453  
       171 天前
    @zhuangzhuang1988 帮你试过了,一秒处理 600 行,202 亿行大概要处理 389 天。
    chutianyao
        85
    chutianyao  
       171 天前
    203 亿行,逐行 hash, 假设 hash256, 单个值占用内存 32 字节, 203 亿行差不多试用内存 604G

    1. 逐行读取并进行 hash
    2. 使用 hash 值构建前缀树
    3. 对每一行的哈希值,有两种情况:
    1) 前缀树中已经存在, 说明哈希值重复, 该行重复了. 操作: 直接忽略本行,读取并处理下一行
    2) 前缀树中不存在, 说明行不重复. 操作: 新建文件 result.csv, 将该行追加到 result.csv 中, 再处理下一行

    关键点:
    1.所有行的哈希值占用空间 604G, 内存才 256G 无法直接存储; 使用硬盘存储后续逐行比对查找的性能太差, 所以这里使用前缀树来存储, 减少相同前缀的哈希值使用的内存空间.(具体能节省多少内存,取决于哈希值/文本行的重复比例, 极端情况 203 亿行都不重复的情况下, 前缀树估计也会把内存耗尽?)
    2.发现重复行,不直接从原文件中删除, 而是新建文件保存结果. 目的是使用追加写文件的形式、减少随机读写文件造成的性能磁盘 io 损耗
    MoYi123
        86
    MoYi123  
       171 天前
    感觉很多人不懂“保持原先顺序是什么意思”

    是[1,3,2,3] -> [1,3,2]
    而不是[1,3,2,3] -> [1,2,3]
    lsk569937453
        87
    lsk569937453  
       171 天前
    @chutianyao 1.系统的最大内存是 256G ,当所有的行都不相同的时候会占用 604G 啊。你不会假定有多少重复吧。。。。
    forty
        88
    forty  
       171 天前
    学到了 1 个新知识: 布隆过滤器
    感谢大家!

    OP 的这个数据量,用哈希表也足够处理了。也可以先布隆一遍,找出一定不存在重复的,再用哈希排查不确定是否重复的。

    化整为零,先用哈希进行分类,再在分类内部进行除重(省内存,时间换空间)。

    用普通的编程语言,普通的 PC 即可,不依赖其他数据软件。

    203 亿 介于 2^34 与 2^35 (2 的 35 次方) 之间,按 2^35 算,因此 35 比特就能表示行号,可以给它 5 个字节。

    用哈希进行分类,分多少个类就写多少个文件,只记录 MD5 和行号。
    全部分类文件都写完之后,依次载入 1 个分类文件到内存,用哈希表除重,输出哈希重复(应删除的行)的行号,问题就基本解决了。

    如果分 65536 个类,则每个分类下约有 50 多万个数据,每个分类文件约 10MB 。

    如果分 256 个类,则每个分类下约有 8 千万个数据,每个分类文件约 1.6GB ,老 PC 也能干。

    如果分 16 个类,则每个分类下约有 13 亿个数据,每个分类文件约 26GB ,现在的普通 PC 都可以胜任。

    如果强迫症觉得可能有哈希冲突,那就可以再加 1 个不同的哈希算法,对这个数量级来说是基本不用考虑 MD5 冲突的。
    chutianyao
        89
    chutianyao  
       171 天前
    @lsk569937453 所以我说了嘛,看行重复比例. 同时哈希值前缀相同, 也能节省一些内存吧.
    这个方案只是存在一定可行性,但不保证
    lttzzlll
        90
    lttzzlll  
       171 天前
    cat input.csv | sort | uniq > out.csv

    力大出奇迹。优化阻碍发展。你先试试再说。大不了机器崩了呗。
    sampeng
        91
    sampeng  
       171 天前
    这么点数据就要 spark ,mr 了?
    楼上很多说的没错啊。。你倒是试试 sort |uniq 之后看看结果啊。慢是肯定慢,但是试试不比你纠结强。。

    =====

    rocksdb 是一个解决方案。但如果不想上东西自己算也不是不行。自己构建结构体和硬盘文件内映射关系。hash 一定要在内存里面才能对比?在文件里面就不行么。现在都是 ssd ,随机读取没啥吧。

    我猛的一想就是

    1.hash 直接建在硬盘上。每次对比用 seed 偏移来查找。这种业务使用最好别用布隆,毕竟不是近似求结果。而是最终求结果。

    2.6T 文件。内存里只建一个够 N 条的 hash 。先读 N 条。计算 N 条里的没有重复的。保存到文件 a 。然后一直递归下去。得到 n 个小文件。然后问题就变成了 n 个小文件去重的问题。内存大,就把第一个文件读出来,去其他文件一个一个比。以此递归处理。当然,连小文件都不需要,自己规划好数据结构把 6T 文件看成 n 个小文件也是一个逻辑。这个逻辑下哪怕 1G 内存也能算出来。就看时间了。
    sampeng
        92
    sampeng  
       171 天前
    我也是想多了。。哪有这么复杂。

    读一条。算 hash 。然后读下一条,算下一条的 hash 。相同就扔掉。。。没有相同的就写到另一个文件里面去。一个递归好像就完事了。这应该也是 sort|uniq 的逻辑。只需要内存 (每行 byte *2 )。这就是纯粹比 ssd 的速度。想加速就是利用 cpu 的并行运算搞搞分块就好了。
    lttzzlll
        93
    lttzzlll  
       171 天前
    @lttzzlll cat input.csv | sort -u | uniq > out.csv 你试试
    james122333
        94
    james122333  
       170 天前 via Android
    @lttzzlll

    还是看我讲的吧 每行往下查有重複就删除 毕竞 global 搜索删除还是挺好的 一行处理完换下一行 也顺带保证了顺序 内存与硬盘都可以占用少
    james122333
        95
    james122333  
       170 天前 via Android
    @lttzzlll

    应该算是种高明的手法
    LieEar
        96
    LieEar  
       170 天前
    6.20TB 的超大 csv ,难以置信。
    我觉得可以试试 DuckDB
    kuagura
        97
    kuagura  
       170 天前 via Android
    换一个 10T 内存的机器 采购 你都有得赚,项目结束再卖掉
    psyer
        98
    psyer  
       170 天前 via Android
    看到这么多讨论,给出方案,有没有人给一个切实可行的代码?实践是检验真理的唯一标准。
    wxf666
        99
    wxf666  
       167 天前
    C++ 新人,写个去重练练手。

    - 结果:2.50 GB 文本( 900 万行,336 字/行),1GB 内存限制,6 秒保持顺序地去重完毕。
    - 硬件:七年前 i5-8250U 轻薄本,读写在内存盘中(读 8G/s ,写 3G/s ,1000 元 2TB 固态都能有的速度,不过分吧?)
    - 预计:4 小时能去重完毕 6.20TB ?

    新人刚学会写,可能还有诸多不足之处。
    写的过程中,还有很多优化点没写。比如:

    1. 排序时,子范围太小,转为其他排序方式。
    2. 读写文件,用的默认缓冲区大小( 4K ? 16K ?不知道多大,估计很小。。)
    3. 分块时,可以去除重复行,减少稍后读写数据量。

    继续改进点:

    - 转用 hash 去重,大幅减少硬盘读写数据量。
    - 只是要承担极小概率重复风险。但 Git 也在用这种方式。。
    - 实在不行,发现重复 hash 时,再去读原文件完整比较。

    ## 截图


    ## 代码

    ```c++
    // V 站吞空格,缩进改为全角空格了

    #include <queue>
    #include <vector>
    #include <thread>
    #include <cstring>
    #include <sstream>
    #include <fstream>
    #include <iostream>
    #include <algorithm>
    #include <stdexcept>
    #include <filesystem>
    #include <string_view>
    #include <fcntl.h>
    #include <unistd.h>
    #include <sys/mman.h>
    #include <sys/stat.h>

    using std::ios;
    using std::vector, std::string_view;
    using std::to_string, std::ofstream;
    namespace fs = std::filesystem;

    int max_thread = 8;
    size_t max_memory = 1ull << 30;
    const auto tmpDir = fs::temp_directory_path();

    struct Meta {
       ptrdiff_t offset;
       size_t length;
       friend ofstream& operator<< (ofstream& ofs, const Meta& self) {
         ofs.write(reinterpret_cast<const char*>(&self), sizeof self);
         return ofs;
      }
    };

    struct Line {
       int chunkIdx{};
       ptrdiff_t offset{};
       string_view str{};
       auto operator> (const Line& other) {
         return std::tie(str, chunkIdx, offset)
          > std::tie(other.str, other.chunkIdx, other.offset);
      }
    };

    template <class T = char>
    class MappedFile {
       int fd = -1;
       const T* ptr{};
    public:
       const T* data{};
       size_t size{};

       explicit MappedFile(const fs::path& file) {
         struct stat64 fs{};
         fd = open64(file.c_str(), O_RDONLY);
         if (fd != -1 && fstat64(fd, &fs) != -1) {
           size = static_cast<size_t>(fs.st_size) / sizeof(T);
           data = ptr = static_cast<T*>(mmap64(nullptr, fs.st_size, PROT_READ, MAP_SHARED, fd, 0));
        }
      }

       MappedFile(const MappedFile& other) = delete;

       MappedFile(MappedFile&& old) noexcept:
         fd(old.fd), ptr(old.ptr), data(old.data), size(old.size) {
         old.fd = -1;
         old.ptr = old.data = nullptr;
      }

      ~MappedFile() {
         if (data) munmap(const_cast<T*>(data), size * sizeof(T));
         if (fd != -1) close(fd);
      }

       auto end() const {
         return data + size;
      }

       operator const T*&() {
         return ptr;
      }
    };

    template <class Iter>
    void mergeSort(Iter* src, Iter* dst, size_t len, int max_thread = 1, int id = 1) {
       if (id == 1)
         std::copy_n(src, len, dst);
       if (len > 1) {
         std::thread t;
         size_t half = len / 2;
         if (id < max_thread) // 只在左子树开启新线程
           t = std::thread(mergeSort<Iter>, dst, src, half, max_thread, id * 2);
         else
           mergeSort(dst, src, half, max_thread, id * 2);
         mergeSort(dst + half, src + half, len - half, max_thread, id * 2 + 1);
         if (t.joinable())
           t.join();
         std::merge(src, src + half, src + half, src + len, dst);
      }
    }

    // 步骤 1:分块,返回块数
    int step1_SplitChunks(const fs::path& inFile) {

      // 映射源文件
       MappedFile text {inFile};
       if (!text) throw std::runtime_error("无法打开输入文件");

      // 分块,直到源文件结束
       int chunkCount = 0;
       for (auto chunkBegin = +text; (chunkBegin = text) < text.end();) {

        // 不断记录行,直到(此次遍历过的源文件大小 + 行数据数组大小 * 2 )到达内存限制
         vector<string_view> lines, sortedLines;
         while (text < text.end() && (text - chunkBegin + sizeof(string_view) * lines.size() * 2) < max_memory) {
           auto lineEnd = (char*) std::memchr(text, '\n', text.end() - text);
           auto lineLen = (lineEnd ? lineEnd : text.end()) - text;
           lines.emplace_back(text, lineLen);
           text += lineLen + 1;
        }

        // 准备写入(排序后)分块、行数据。
         ofstream chunkFile (tmpDir / (to_string(chunkCount) + ".txt"), ios::binary | ios::trunc);
         ofstream metaFile (tmpDir / (to_string(chunkCount) + ".meta"), ios::binary | ios::trunc);
         chunkCount++;

        // 多线程排序行数组
         sortedLines.resize(lines.size());
         mergeSort(lines.data(), sortedLines.data(), lines.size(), max_thread);

        // 保存(排序后)每行文本、偏移、长度
         for (auto line: sortedLines) {
           chunkFile << line;
           metaFile << Meta{line.data() - chunkBegin, line.size()};
        }

        // 检查
         if (!chunkFile || !metaFile) {
           std::stringstream buf;
           buf << "写入第 " << chunkCount << " 分块时出错!";
           throw std::runtime_error(buf.str());
        }
      }

       return chunkCount;
    }

    // 步骤 2:查找重复行
    void step2_FindDupLines(int chunkCount) {

       vector<ofstream> chunkDups;
       vector<MappedFile<>> chunkText;
       vector<MappedFile<Meta>> chunkMeta;
       std::priority_queue<Line, vector<Line>, std::greater<>> lines;

      // 映射所有分块的文本、行数据文件,
      // 也准备好记录各分块重复行数据的文件
       for (int idx = 0; idx < chunkCount; idx++) {
         chunkText.emplace_back(tmpDir / (to_string(idx) + ".txt"));
         chunkMeta.emplace_back(tmpDir / (to_string(idx) + ".meta"));
         chunkDups.emplace_back(tmpDir / (to_string(idx) + ".dups"), ios::binary | ios::trunc);
         lines.push({idx});
      }

      // 利用小根堆,按(行内容,分块号,偏移量)顺序,流式多路归并
       string_view last{};
       while (!lines.empty()) {

        // 与上一行相同,则将偏移量写入,对应分块待删除行名单内
         auto line = lines.top(); lines.pop();
         if (last == line.str && !last.empty())
           chunkDups[line.chunkIdx].write((char*)&line.offset, sizeof line.offset);
         last = line.str;

        // 该分块行数据未遍历完,则继续将下一行添加进小根堆中
         auto& text = chunkText[line.chunkIdx];
         auto& meta = chunkMeta[line.chunkIdx];
         if (meta < meta.end()) {
           lines.push({line.chunkIdx, (*meta).offset, {text, (*meta).length}});
           text += (*meta).length;
           meta++;
        }
      }

      // 检查
       for (auto&& file: chunkDups) {
         if (!file) {
           std::stringstream buf;
           buf << "保存第 " << chunkCount << " 分块删除名单时出错!";
           throw std::runtime_error(buf.str());
        }
      }
    }

    // 步骤 3:合并分块
    void step3_MergeChunks(int chunkCount, const fs::path& outFile) {

       ofstream textOut {outFile, ios::binary | ios::trunc};
       if (!textOut) throw std::runtime_error("无法打开输出文件");

       for (int idx = 0; idx < chunkCount; idx++) {

        // 映射分块(排序后)文本、行数据、删除名单
         MappedFile<> text {tmpDir / (to_string(idx) + ".txt")};
         MappedFile<Meta> meta {tmpDir / (to_string(idx) + ".meta")};
         MappedFile<decltype(Meta::offset)> dups {tmpDir / (to_string(idx) + ".dups")};

        // 剔除删除名单中的行
         vector<Line> lines; lines.reserve(meta.size);
         for (; meta < meta.end(); text += (*meta++).length) {
           if (dups < dups.end() && *dups == (*meta).offset)
             dups++;
           else
             lines.push_back({idx, (*meta).offset, {text, (*meta).length}});
        }

        // 再按偏移量顺序排序好
         std::sort(lines.begin(), lines.end(), [](auto&& a, auto&& b) {
           return a.offset < b.offset;
        });

        // 逐行输出
         for (auto&& line: lines)
           textOut << line.str << '\n';
      }

      // 检查
       if (!textOut)
         throw std::runtime_error("写入输出文件时出错!");
    }

    int main(int argc, const char* argv[]) {

       if (argc < 3) {
         std::stringstream buf;
         buf << "大文本去重并保持顺序工具\n\n"
          << "用法:" << argv[0] << " 输入文件 输出文件 "
          << "[内存限制 MB = " << (max_memory >> 20) << "] "
          << "[线程限制 = " << max_thread << "]";
         std::cerr << buf.str() << std::endl;
         return -1;
      }

       auto inFile = argv[1];
       auto outFile = argv[2];
       if (argc > 3) max_memory = (std::max)(std::stoull(argv[3]), 1ull) << 20ull;
       if (argc > 4) max_thread = (std::max)((std::min)(std::stoi(argv[4]), 256), 1);

       auto chunkCount = step1_SplitChunks(inFile);
       step2_FindDupLines(chunkCount);
       step3_MergeChunks(chunkCount, outFile);

      // 清空临时文件
       for (int i = 0; i < chunkCount; i++)
         for (auto&& suffix: {".txt", ".meta", ".dups"})
           fs::remove(tmpDir / (to_string(i) + suffix));
    }
    ```
    Keuin
        100
    Keuin  
       166 天前
    ```shell
    awk '{print $0","NR}' input.csv | sort | sed -E 's/,[0-9]+$//' | uniq
    ```

    Example usage:

    ```
    $ cat input
    1,2,3,4
    2,3,4,5
    3,4,5,6
    4,5,6,7
    2,3,4,5
    1,2,3,4
    5,6,7,8
    $ awk '{print $0","NR}' input
    1,2,3,4,1
    2,3,4,5,2
    3,4,5,6,3
    4,5,6,7,4
    2,3,4,5,5
    1,2,3,4,6
    5,6,7,8,7
    $ awk '{print $0","NR}' input | sort
    1,2,3,4,1
    1,2,3,4,6
    2,3,4,5,2
    2,3,4,5,5
    3,4,5,6,3
    4,5,6,7,4
    5,6,7,8,7
    $ awk '{print $0","NR}' input | sort | sed -E 's/,[0-9]+$//'
    1,2,3,4
    1,2,3,4
    2,3,4,5
    2,3,4,5
    3,4,5,6
    4,5,6,7
    5,6,7,8
    $ awk '{print $0","NR}' input | sort | sed -E 's/,[0-9]+$//' | uniq
    1,2,3,4
    2,3,4,5
    3,4,5,6
    4,5,6,7
    5,6,7,8
    ```

    不管你的电脑内存是 1T 还是 1G ,都可以正确运行并得到相同输出,因为 sort 命令用的是归并排序,是外存算法。如果你要限制用到的内存大小,把 sort 改成 sort --buffer-size=100M ,即可限制只用 100M 内存,其他命令都是行缓存算法,只会保存当前行在内存里,也就是说,最大内存用量是 max(100M, max_line_size_bytes)
    1  2  
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3380 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 43ms · UTC 10:58 · PVG 18:58 · LAX 02:58 · JFK 05:58
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.