V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
爱意满满的作品展示区。
jingwei8340885
V2EX  ›  分享创造

搞死 MPP 的时空碰撞问题

  •  
  •   jingwei8340885 · 2023-12-11 15:34:22 +08:00 · 1096 次点击
    这是一个创建于 404 天前的主题,其中的信息可能已经有所发展或是发生改变。

    问题描述

    时空碰撞定义

    某时间区间(例如 7 天)被分成多个固定时长(如 15 分钟)的时间切片,对象 a 和对象 b 在同一时间切片内的相同位置出现过,称为一次碰撞。

    规则 1:相同时间切片内,多次碰撞只记一次。

    规则 2:相同时间切片内,最后出现位置不同的称为不匹配,不匹配的时间切片数量不超过 20 时,(包括其它时间切片的)碰撞才被认为有效。

    要求:已知对象 a ,查找出指定时间区间内,满足两条规则下,与 a 发生有效碰撞次数最多的前 20 个对象 b 。

    数据结构与规模

    单一数据表,每天的数据量约 80 亿条记录,每个对象平均 1000 条记录,每条记录存储对象的时空信息(对象标识、时间戳、空间标记),当时间区间为 7 天时,总数据量有 560 亿行,数据结构如下:

    字段名称 字段类型 字段注释 示例数据
    no String 唯一对象标识 100000000009
    ct Int 时间戳,精确到秒 1690819200
    lac String 空间标记 1 40000
    ci String 空间标记 2 66000000

    no 由全数字构成。lac 、ci 总是一起出现,为了描述方便起见,我们可以把 lac 和 ci 并称为一个字段 loc ,已知 loc 去重计数的范围不超过 27 万。

    环境和期望

    在 5 台 64C256G 服务器构成的集群下,期望 1 小时内计算出结果,使用某世界知名 MPP 数据库无法达到预期。

    问题分析

    这个问题用关系数据库确实不容易快速计算,我们尝试用 SQL 写出不考虑规则 2 的运算:

    WITH DT AS ( SELECT DISTINCT no, loc, int(ct/15 分钟) as ct FROM T )
    SELECT TOP 20 * FROM
       ( SELECT B.no, COUNT(DISTINCT B.loc) cnt
       FROM DT AS A JOIN DT AS B ON A.loc=B.loc AND A.ct=B.ct
       WHERE A.no=a AND B.no<>a
       GROUP BY B.no)
    ORDER BY cnt DESC
    

    SQL 中的 DISTINCT 和 JOIN 计算会涉及 HASH 和比对,数据量很大时计算量也会很大,都会严重拖累性能。而且这些运算都涉及随机访问,通常要在内存进行,数据量太大还要使用缓存,性能更会急剧下降甚至可能溢出。仅是规则 1 用 SQL 计算已经很慢了,再加上规则 2 ,MPP 算不出来也不奇怪了。

    如果把对象 a 、b 在时间区间内的相关记录都取出成内存中的集合,然后来统计 a 和 b 发生有效碰撞的次数,并不会很困难。每个对象涉及的记录数并不多,即使 7 天区间也不到 1 万条,内存放下毫无压力。

    设 a 的记录集合是 A ,b 的是 B ,将 A 按时间切片分组为 A1,…,An ,B 分为 B1,…Bn 。所有 Ai,Bi 内成员都按 ct 从小到大排序。

    时间切片 i 内,a,b 发生(不考虑两条规则时的)碰撞的次数,可用

    Ci=Bi.count(Ai.(loc).contain(loc))
    

    计算出来,即统计 Bi 中有多少 loc 在 Ai 中出现过。

    不过,这种两层循环计算会较慢,而我们知道 a 以及 Ai 相对于 b 是确定的,这样可以事先对 Ai 中的 loc 去重后建索引,改为

    Ai’=Ai.id(loc).key@i(loc)
    Ci=Bi.switch@i(loc,Ai’).len()
    

    用 switch@i 过滤掉在 Ai 中找不到 loc 的 Bi 成员,同样可以得到碰撞次数。

    我们只要统计 Ci>0 的时间切片个数即可得到满足规则 1 的碰撞次数。

    类似地,可用

    Di=Ai.m(-1).loc!=Bi.m(-1).loc //m(-1)表示取集合的最后成员
    

    判断出在时间切片 i 中 a,b 是否发生过不匹配。

    有了 Ci,Di ,a,b 的有效碰撞次数就很容易计算了

    if(count(Di)<=20,count(Ci>0))
    

    剩下就是针对该值计算 TopN 的常规任务了。

    如果数据对 no,ct 有序,也很容易实现这个思路。A 可以用二分法一把取出,然后从头遍历对象 b ,因为数据有序,每次取出对应的 B 很容易。A 和 B 都对 ct 有序时,可以用有序分组计算出 Ai,Bi ,且保证上述 m(-1)的正确性。

    可惜关系数据库无法保证数据有序存储,也没有相关的有序计算方法,只能写出非常绕的嵌套 SQL 。

    SPL 有这种有序存储和相关的计算机制,容易实现。

    基于这个思路,还有一些工程上的优化手段。

    数据转换

    将 no 变成数,两个位置 lac 、ci 合并成一个 loc ,并且序号化(原来是字符串,数字化时就顺便处理为序号了)。

    转换后的数据结构如下

    字段名称 数据类型 字段含义 示例数据
    no Long 唯一对象标识 100000000009
    ct Int 时间戳,精确到秒 1690819200
    loc Int 空间标记 10282

    相比原数据结构,转存时做了以下两点变动:

    1 、将 lac 、ci 两个字段合并为 loc 字段,并转换成 Int 型序号。原 lac 、ci 作为维表单独存储。

    2 、将数字串 no 的数据类型变为 Long 型整数。

    关联与序号化

    前面分析中提到的每个时间切片的 Ai 建索引,但 Ai 太小了(平均长度在 10 左右),对于过小的集合使用索引的效果不明显。所以,我们在工程上改造成对整个 A (长度约有 1000 )建索引,这样要把时间切片序号 i 也加到主键上,大致代码:

    A’=A.derive((ct-st)\900:i).groups(i,loc).index()
    

    其中 st 是时间区间的起点,即每 900 秒分出一个时间切片。

    这时 Ci 的计算要变成先关联(过滤)再分组了:

    B.derive((ct-st)\900:i).join@i(i:loc,A).groups(i;count(1):C)
    

    这样就可以计算出以 i 和 Ci 为字段的序表,未碰撞的情况被 join@i 过滤掉了。

    join@i 使用索引实现关联过滤时,还是要计算 HASH 并比对,仍然有一定的计算量。其实我们知道,全部 i,loc 组合最多有 7 天96 (每天 96 个 15 分钟)27 万种可能,这并不是很大。如果用一个布尔值数组(序列)表示 A 在各个时间切片中是否在某个 loc 出现过,其长度最多也就是 79627 万,内存完全可以装得下。这样,我们就可以用对位序列技术来实现关联过滤,避免 HASH 计算和比对时间,能更快速地计算 Ci 。

    用 aloc 表示 A 的对位序列:

    aloc=A.align@a(672,(ct-st)\900+1).(x=270000.(false),~.run(x(loc)=true),x)
    

    因为有时间切片和位置两个维度,这里也使用了二层的对位序列。将 A 按时间切片分成 672 ( 7*96 )个 组,每组是个 27 万个布尔值成员的序列,对于时间切片 i 中在位置 loc 出现过的对象,可以简单地用 aloc(i)(loc)迅速判断出是否与 a 发生过碰撞(即 a 是否也在时间切片 i 中在位置 loc 出现过)。

    a 在每个时间切片的最后位置,也可以用一个序列表示为:

    alast=A.align@a(672,(ct-st)\900+1).(~.m(-1).loc)
    

    alast(i)就是 a 在时间切片 i 的最后位置,同样可以简单地用时间切片序号访问,以便快速计算 Di 。

    按天分表

    以上讲的算法要求数据对 no,ct 有序。但数据每天会新增,新增数据通常只会对 ct 有序甚至彻底无序。如果每次都要所有数据大排序就非常慢,即使只把新增数据排序再归并也要重写 560 亿行的数据,过于耗时。

    SPL 复组表可以将多个有序的组表逻辑上合并成一个更大的有序组表,这样每天一个分组表存储,计算时用复组表归并分表数据,归并后的数据也可以支持并行计算。避免了全量数据每天重写,复组表读取时会损失少量归并时间,但获得数据维护的灵活性还是值得的。

    当历史数据过期时,直接将相应日期的分表文件删除就可以了,非常简单。

    实践过程

    准备实验数据

    将数据按天存储,每天内数据 no 、ct 有序,保存为列存组表,例如将 7 天数据,分别存为:1.day.ctx,…,7.day.ctx ,由这 7 个分表可构成复组表,造数据脚本可以这样写:

    A B C
    1 =rand@s(1)
    2 for n =file("day"/A2/".btx")
    3 =movefile(B2)
    4 =elapse@s(sd,(A2-1)*86400)
    5 =long(B4)\1000
    6 for nm =1000000.new(100000000000+rand(8000000):no,int(B5+rand(86400)):ct,int(rand(270000)+1):loc)
    7 =B2.export@ab(C6)
    8 =file(A2/".day.ctx").create@py(#no,#ct,loc)
    9 =B2.cursor@b().sortx(#1,#2)
    10 >B8.append@i(B9)
    11 =movefile(B2)

    参数值有 3 个:

    1 、n ,几天,举例:1 ,代表 1 天

    2 、nm ,每天几百万,举例:1000 ,代表 10 亿

    3 、sd ,起始日期,举例:2023-08-01

    B8 建立组表时用了 @p 选项,表示按第一个字段 no 作为分段键。并行计算时需要对组表分段,不能把相同 no 的记录分到两段,使用 @p 选项可以在组表分段时保证这一点。

    计算脚本

    A
    1 =now()
    2 270000
    3 =n*24*3600\pt
    4 =file("day.ctx":to(n)).open()
    5 =A4.cursor@m(ct,loc;no==src_no).fetch().align@a(A3,(ct-st)\pt+1)
    6 =alast=A5.(~.m(-1).loc)
    7 =aloc=A5.(x=A2.(false),~.run(x(loc)=true),x)
    8 =A4.cursor@m(;no!=src_no).derive((ct-st)\pt+1:tn,aloc(tn)(loc):loca,alast(tn):lasta)
    9 =A8.group@s(no,tn;lasta,count(loca):cnt,top@1(-1,0,loc):lastb)
    10 =A9.group@s(no;count(cnt>0):cnt,count(lasta && lastb && lastb!=lasta):dcnt)
    11 =A10.select(cnt>0 && dcnt<=A3).total(top(-20;cnt))
    12 =file("app2_result.csv").export@ct(A11.new(src_no,no:dst_no,cnt:count))
    13 =interval@ms(A1,now())

    参数值有 4 个:

    1 、src_no 为对象 a 的特征号,举例:100000000009 2 、st 为起始时间戳(秒),举例:1690819200 ,对应 2023-08-01 00:00:00 3 、n 为统计天数,举例 7 4 、pt 为切片时间的秒数,举例 900

    A3:为统计时间区间内的总时间切片数

    A5:读出对象 a 的数据,产生时间切片序号并按该序号分组。组表对 no 有序时,用 no==src_no 的条件可以迅速定位到目标数据。

    A6:基于 A5 计算 a 在每个时间切片的最后空间值

    A7:基于 A5 计算 a 在对位序列,前面已经解释过计算原理

    A8:遍历其他(非 a 的)对象,生成时间切片序号 tn (使用新符号与 a 区别)。对于第一记录,从 aloc 中取出当前对象是否在时间切片 tn 和位置 loc 上和 a 发生碰撞记入 loca ,从 alast 中取出时间切片 tn 中对象 a 的最后空间值。

    A9:按对象和时间切片分组,可以用 lasta 计算每个对象在时间切片中与 a 的碰撞次数 cnt ,即前面分析的 Ci ;并计算出该对象在该时间切片中最后的 loc 记为 lastb 。

    A10:进一步按对象分组,计算出该对象与 a 的(考虑规则 1 后的)碰撞次数和不匹配次数。每个时间切片中的 Ci>0 即认定为一次碰撞,所以是 count(cnt>0),记入新的 cnt ;最后位置不同时计算一次不匹配,记入 dcnt 。

    A11:过滤掉无效碰撞的对象后取有效碰撞次数最多的前 20 名。这里做实验时采用的条件 dcnt<=A3 ,实际应该是 dcnt<=20 ,因为随机生成的数据中几乎没有 count(Di)<=20 的,就会算出空集。而 count(Di)最大值就是 A3 ,可以保证总能统计出结果。这样的计算量会比针对实际数据会更大,用于测试性能只会吃亏。

    编号化及还原

    以上代码在造数据时,是按 no 已经整数化,且 lac,ci 被合为序号来写的,实际上先要做转换整理,完成计算后还要还原。具体介绍可以参考:数据转存时的整数化

    实验效果

    SPL 使用单机( 8C64G ),计算总时间跨度 7 天(总数据量有 560 亿行),时间切片为 15 分钟,耗时为 121 秒。

    实际上,达到这个性能还会少量使用 SPL 企业版中的列式运算选项,但因为不涉及原理分析,这里就不详述了。

    后记

    这是个典型的对象统计问题,这类问题一般有如下几个特点:

    1. 统计满足某种条件的对象的个数

    2. 对象的数量非常多,但每个对象涉及的数据量不多

    3. 条件非常复杂,通常还和次序有关,需要一定的步骤才能判断出来

    面对这类问题,一个常见的思路就是把数据按对象排序,逐步取出每个对象的数据进入内存,再来做复杂的条件判断。

    现实中这种运算很常用,银行的帐户统计、电商的用户漏斗分析等等都是这种运算。

    SQL 很难实现这种运算,不保证有序存储,也缺乏有序运算,也很难写这些复杂判断。经常要写成很绕的嵌套语句,或者使用存储过程,无论如何,执行性能都会很差。

    SPL 则提供了有序存储及有序计算,也支持复杂的过程计算,能够很方便实现这类统计。

    SPL 资料

    1 条回复    2023-12-12 08:55:11 +08:00
    XhivaW
        1
    XhivaW  
       2023-12-12 08:55:11 +08:00
    这东西的宣传过去半年看了不知道多少次。。。 有用过的 v 友锐评一波吗?
    个人是感觉没啥兴趣也没啥需求
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2691 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 12:26 · PVG 20:26 · LAX 04:26 · JFK 07:26
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.