如何使用PYSPARK从Spark获得批次行

How do you get batches of rows from Spark using pyspark(如何使用PYSPARK从Spark获得批次行)
本文介绍了如何使用PYSPARK从Spark获得批次行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含60多亿行数据的Spark RDD,我想使用Train_on_Batch来训练深度学习模型。我不能将所有行都放入内存中,所以我希望一次获得10K左右的内存,以批处理成64或128个的块(取决于型号大小)。我目前使用的是rdd.Sample(),但我认为这不能保证我会得到所有行。有没有更好的方法来划分数据,使其更易于管理,这样我就可以编写一个生成器函数来获取批处理?我的代码如下:

data_df = spark.read.parquet(PARQUET_FILE)
print(f'RDD Count: {data_df.count()}') # 6B+
data_sample = data_df.sample(True, 0.0000015).take(6400) 
sample_df = data_sample.toPandas()

def get_batch():
  for row in sample_df.itertuples():
    # TODO: put together a batch size of BATCH_SIZE
    yield row

for i in range(10):
    print(next(get_batch()))

推荐答案

我不相信Spark会让您对数据进行偏移或分页。

但您可以添加索引,然后对其进行分页,首先:

    from pyspark.sql.functions import lit
    data_df = spark.read.parquet(PARQUET_FILE)
    count = data_df.count()
    chunk_size = 10000

    # Just adding a column for the ids
    df_new_schema = data_df.withColumn('pres_id', lit(1))

    # Adding the ids to the rdd 
    rdd_with_index = data_df.rdd.zipWithIndex().map(lambda (row,rowId): (list(row) + [rowId+1]))

    # Creating a dataframe with index
    df_with_index = spark.createDataFrame(chunk_rdd,schema=df_new_schema.schema)

    # Iterating into the chunks
    for chunk_size in range(0,count+1 ,chunk_size):
        initial_page = page_num*chunk_size
        final_page = initial_page + chunk_size 
        where_query = ('pres_id > {0} and pres_id <= {1}').format(initial_page,final_page)
        chunk_df = df_with_index.where(where_query).toPandas()
        train_on_batch(chunk_df) # <== Your function here        

这不是最佳方案,因为使用了 pandas 数据帧,它会严重利用Spark,但会解决您的问题。

如果这会影响您的功能,请不要忘记删除ID。

这篇关于如何使用PYSPARK从Spark获得批次行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

相关文档推荐

Leetcode 234: Palindrome LinkedList(Leetcode 234:回文链接列表)
How do I read an Excel file directly from Dropbox#39;s API using pandas.read_excel()?(如何使用PANDAS.READ_EXCEL()直接从Dropbox的API读取Excel文件?)
subprocess.Popen tries to write to nonexistent pipe(子进程。打开尝试写入不存在的管道)
I want to realize Popen-code from Windows to Linux:(我想实现从Windows到Linux的POpen-code:)
Reading stdout from a subprocess in real time(实时读取子进程中的标准输出)
How to call type safely on a random file in Python?(如何在Python中安全地调用随机文件上的类型?)