在APACHE BEAM中连接多个CSV文件

Concatenating multiple csv files in Apache Beam(在APACHE BEAM中连接多个CSV文件)
本文介绍了在APACHE BEAM中连接多个CSV文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用fileio.MatchFiles将几个csv文件转换为pd.DataFrame文件,然后将它们连接成一个csv文件。为此,我创建了两个ParDo类,将文件转换为DataFrame,然后将它们合并为merged csv。整个代码片段如下所示:
class convert_to_dataFrame(beam.DoFn):
    def process(self, element):
        return pd.DataFrame(element)

class merge_dataframes(beam.DoFn):
    def process(self, element):
        logging.info(element)
        logging.info(type(element))
        return pd.concat(element).reset_index(drop=True)

p = beam.Pipeline() 
concating = (p
             | beam.io.fileio.MatchFiles("C:/Users/firuz/Documents/task/mobilab_da_task/concats/**")
             | beam.io.fileio.ReadMatches()
             | beam.Reshuffle()
             | beam.ParDo(convert_to_dataFrame())
             | beam.combiners.ToList()
             | beam.ParDo(merge_dataframes())
             | beam.io.WriteToText('C:/Users/firuz/Documents/task/mobilab_da_task/output_tests/merged', file_name_suffix='.csv'))

p.run()
运行后,我在ParDO(merge_dataframes)上收到ValueError。我假设ReadMatches没有分配任何文件,或者ParDo(convert_to_dataFrame)没有返回任何对象。关于此方法或任何其他读取和合并文件的方法的任何想法。 错误输出:
ValueError:没有要串联的对象[在运行时 ‘ParDo(Merge_Dataframes)’]

Windows

要回答有关错误ValueError: No objects to concatenate [while running 'ParDo(merge_dataframes)'],的第一个问题,您在推荐答案文件系统上,需要使用分隔符 而不是 /。您可以使用os.path.join,并且不需要担心文件系统:

import os 
all_files1 = glob.glob(os.path.join(path1, "*.csv"))
对于与错误ValueError: DataFrame constructor not properly called! [while running 'ParDo(convert_to_dataFrame)'],有关的第二个问题,您向DataFrame构造函数发送的是另一种类型的dict值,而不是dict本身。这就是您收到该错误的原因。

您可以这样做:

DataFrame(eval(data))

这篇关于在APACHE BEAM中连接多个CSV文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

相关文档推荐

Leetcode 234: Palindrome LinkedList(Leetcode 234:回文链接列表)
How do I read an Excel file directly from Dropbox#39;s API using pandas.read_excel()?(如何使用PANDAS.READ_EXCEL()直接从Dropbox的API读取Excel文件?)
subprocess.Popen tries to write to nonexistent pipe(子进程。打开尝试写入不存在的管道)
I want to realize Popen-code from Windows to Linux:(我想实现从Windows到Linux的POpen-code:)
Reading stdout from a subprocess in real time(实时读取子进程中的标准输出)
How to call type safely on a random file in Python?(如何在Python中安全地调用随机文件上的类型?)