1
lsvih 2019-09-02 16:36:27 +08:00
为啥不能把合并好的 df 写出呢
|
2
allenloong OP @lsvih #1 会直接报 stack overflow
|
3
liprais 2019-09-02 16:38:54 +08:00
spark 写的时候报啥错
|
4
allenloong OP @liprais #3 stack overflow, 是在一个 fat node 上跑的,增加了 Executor memory 也不行
|
5
letking 2019-09-02 16:45:58 +08:00
用 saveAsTextFile 保存到 hdfs 文件夹(yarn 集群)或者本地文件夹(local 模式),然后直接 cat 把文件夹下所有文件写入一个文件就行了(要去除表头行)。
或者用 toLocalIterator 方法,把数据都收集到 driver 上然后写入到一个文件里。 |
6
wilimm 2019-09-02 16:47:15 +08:00
cat file1 file2 ... > file_merge
|
7
autogen 2019-09-02 17:11:05 +08:00
cat origin/*.cvs > merged.cvs
|
8
MMMMMMMMMMMMMMMM 2019-09-02 17:15:37 +08:00
|
9
allenloong OP @letking #5 saveAsTextFile 还是会报 stack overflow
|
10
momocraft 2019-09-02 17:26:44 +08:00
这格式是 tsv .. ?
可能某处的代码试图在内存中生成整个文件了 从例子看不出你的 merge 有多复杂,最坏情况总可以导入关系数据库,然后从 query 流成 tsv/csv |
11
allenloong OP @momocraft #10 是的,就是 tsv,合并的时候没问题,一往外写就出问题了。(第一次用 pyspark,可能真的是自己的问题。
BName = str(os.path.basename(bg_f[0]).split('.')[0]) schema = StructType([ StructField('CataID', StringType(), True), StructField('Start_Block', IntegerType(), True), StructField('End_Block', IntegerType(), True), StructField(BName, IntegerType(), True) ]) temp = sqlContext.read.csv(bg_f[0], sep='\t', header=False, schema=schema) for p in bg_f[1:]: SName = str(os.path.basename(p).split('.')[0]) schema = StructType([ StructField('CataID', StringType(), True), StructField('Start_Block', IntegerType(), True), StructField('End_Block', IntegerType(), True), StructField(BName, IntegerType(), True) ]) cur = sqlContext.read.csv(p, sep='\t', header=False, schema=schema) temp = temp.join(cur, on=['CataID', 'Start_Block', 'End_Block'], how='outer') temp = temp.drop('CataID', 'Start_Block', 'End_Block') |
12
letking 2019-09-02 17:40:40 +08:00
@allenloong spark 惰性运行的,合并操作可能没有立即执行而是在写结果的时候才执行。
你这问题可能是合并时内存就不够。 |
13
allenloong OP @letking #12 那有什么好的方法推荐吗?
|
14
liprais 2019-09-02 17:47:09 +08:00
temp = temp.join(cur,
on=['CataID', 'Start_Block', 'End_Block'], how='outer') temp = temp.drop('CataID', 'Start_Block', 'End_Block') 这一段好像达不到你想要的效果 |
15
optional 2019-09-02 17:47:36 +08:00
导入到数据库,然后再导出?
|
16
ytmsdy 2019-09-02 17:48:23 +08:00
都这么大数据量了,写个脚本搞到数据库里面,然后再从数据库里面导出来。
|
17
xypty 2019-09-02 17:49:59 +08:00
行名称一致的话,导入数据库,然后导出,找个 csv 导入导出最快的数据库就行了
|
18
allenloong OP @xypty #17 每个文件的行数不一样,也不一致,列名只有前三列是一样的。
|
19
allenloong OP @liprais #14 我是想合并的时候用前三列做参考,输出的时候再扔掉前三列。
|
20
allenloong OP @optional #15 每一个文件应该都要单独导入...列名不一样
|
21
xypty 2019-09-02 17:54:10 +08:00
@allenloong 可以考虑一下 mongo,mongo 我觉得还可以
|
22
tinybaby365 2019-09-02 17:56:52 +08:00
不知道为什么要合并,意义何在?如果为了方便存储管理,那就打成一个 tar 包。
实在想不到合并成一个大文件的好处,以后完全没法并行处理啊…… |
23
xomix 2019-09-02 17:57:17 +08:00
这种建议是第一个文件直接复制,第二和第三个文件直接从固定字节开始读取(从第一个文件判断表头长度)后追加到复制的文件后。
这样应该最快,再加上一些大文件复制的缓存等机制还能更快点。 |
24
letking 2019-09-02 17:58:49 +08:00
@allenloong 依次导入每个文件为一张表,然后 join 导入的表,保存成中间表。跟你现在的流程一样。
|
25
allenloong OP @tinybaby365 #22 因为想用合并好的表去做后面的计算,但是计算要求的表就是这样。
|
26
allenloong OP @letking #24 还是用 spark 或者任意数据库?
|
27
allenloong OP @xomix #23 没有太明白 XD
|
28
letking 2019-09-02 18:07:04 +08:00
@allenloong mysql 应该就可以。
spark 之所以这么搞不行是因为 spark 是把所有数据都加载的内存里处理的,而数据库是把数据存磁盘的。 如果你有 hadoop 集群的话可以用 spark 这么做。 |
29
allenloong OP @letking #28 明白了 我试试 谢谢你
|
30
corefx 2019-09-02 18:13:58 +08:00
楼上这么多回复,看的我一脸问号,斗胆问下下合并文件跟内存有什么关系?直接操作文件流,从源文件读取一行,向目标文件写入一行,全部内存开销就是 1 行文本啊!!!
|
31
aheadlead 2019-09-02 18:18:12 +08:00
分治?类似于归并排序?
|
33
cigarzh 2019-09-02 18:31:11 +08:00 via iPhone
上数据库
|
34
rrfeng 2019-09-02 18:48:02 +08:00
如果有序,就是个归并排序,可以边排边写。
如果无序,至少要把 index 存起来,可以用 hash 或者原始值,2000w 的 index 算一下内存需要多少? 最后按 hash 输出即可。 awk '{a[$1$2$3]+=$4}END{for(i in a) print i,a}' *.csv |
35
guyskk0x0 2019-09-03 09:53:25 +08:00 via Android
遍历所有行,按 hash(col1,2,3) % N 分组写到 N 个文件,相同的 key 都在同一个文件。
再对每个文件分别做合并。 最终结果直接 concat 即可。 |
36
allenloong OP @guyskk0x0 #35 emmmm 如果按照 hash 把文件进行了拆分,合并的时候怎么保证每个小文件的列能够对应上呢?
|
37
vimiix 2019-09-03 17:55:56 +08:00
尝试下 dpark?
|
38
cassidyhere 2019-09-03 18:29:29 +08:00
先读所有文件的第一行构造出最终的 columns,再遍历文件
|