从 spark rdd 收集大型数据集的最佳实践是什么?数据、spark、rdd

2023-09-06 05:08:16 作者:迷糊少女

我正在使用 pyspark 处理我的数据,最后我需要使用 rdd.collect() 从 rdd 收集数据.但是,由于内存问题,我的 spark 崩溃了.我尝试了很多方法,但没有运气.我现在使用以下代码运行,为每个分区处理一小块数据:

I am using pyspark to process my data and at the very end i need collect data from rdd using rdd.collect(). However, my spark crashes due to the memory problem. I tried a number of ways, but no luck. I am now running with the following code, process a small chunk of data for each partition:

def make_part_filter(index):
    def part_filter(split_index, iterator):
        if split_index == index:
            for el in iterator:
                yield el
    return part_filter

for part_id in range(rdd.getNumPartitions()):
    part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
    myCollection = part_rdd.collect()
    for row in myCollection:
          #Do something with each row


The new code I am currently using does not crash, but seems running forever.

有没有更好的方法从大型 rdd 中收集数据?

Is there a better way to collect data from a large rdd?



I don't know if this is the best way, but it's the best way I've tried. Not sure if it's better or worse than yours. Same idea, splitting it into chunks, but you can be more flexible with the chunk size.

def rdd_iterate(rdd, chunk_size=1000000):
    indexed_rows = rdd.zipWithIndex().cache()
    count = indexed_rows.count()
    print("Will iterate through RDD of count {}".format(count))
    start = 0
    end = start + chunk_size
    while start < count:
        print("Grabbing new chunk: start = {}, end = {}".format(start, end))
        chunk = indexed_rows.filter(lambda r: r[1] >= start and r[1] < end).collect()
        for row in chunk:
            yield row[0]
        start = end
        end = start + chunk_size

我想将一个巨大的 RDD 附加到磁盘上的 CSV 文件而不用整个 RDD 填充 Python 列表的示例用法:

Example usage where I want to append a huge RDD to a CSV file on disk without ever populating a Python list with the entire RDD:

def rdd_to_csv(fname, rdd):
    import csv
    f = open(fname, "a")
    c = csv.writer(f)
    for row in rdd_iterate(rdd): # with abstraction, iterates through entire RDD

rdd_to_csv("~/test.csv", my_really_big_rdd)