如何为数据流安装PYTHON依赖项

How to install python dependencies for dataflow(如何为数据流安装PYTHON依赖项)
本文介绍了如何为数据流安装PYTHON依赖项的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个非常小的Python数据流包,包的结构如下所示

.
├── __pycache__
├── pubsubtobigq.py
├── requirements.txt
└── venv

requirements.txt的内容为

protobuf==3.11.2
protobuf3-to-dict==0.1.5

我使用以下代码运行管道

python -m pubsubtobigq 
  --input_topic "projects/project_name/topics/topic_name" 
  --job_name "job_name" 
  --output "gs://mybucket/wordcount/outputs" 
  --runner DataflowRunner 
  --project "project_name"  
  --region "us-central1" 
  --temp_location "gs://mybucket/tmp/" 
  --staging_location "gs://mybucket/staging" 
  --requirements_file requirements.txt 
  --streaming True

使用此库的代码如下

from protobuf_to_dict import protobuf_to_dict

def parse_proto(message):
    dictinoary = protobuf_to_dict(message)

但该行失败,说明protobuf_to_dict是未知符号。即使我尝试使用google.protobuf.json_format中的Google内置方法MessageToDict,我也得到了相同的错误。

我如何修复此问题?我需要安装这两个库中的任何一个

编辑

我从google.protobuf.json_format使用MessageToDict时出现错误消息

Error processing instruction -31. Original traceback is Traceback (most recent call last): File 
"apache_beam/runners/common.py", line 813, in apache_beam.runners.common.DoFnRunner.process File 
"apache_beam/runners/common.py", line 449, in 
apache_beam.runners.common.SimpleInvoker.invoke_process File "/Users/username/repos/
dataflow-pipeline/venv/lib/python3.7/site-packages/apache_beam/transforms/core.py", line 1415, in 
wrapper = lambda x: [fn(x)] File "/Users/username/repos/dataflow-pipeline/pubsubtobigq.py", line 
16, in parse_proto NameError: name 'MessageToDict' is not defined During handling of the above 
exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/
python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 143, in _execute response 
= task() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", 
line 193, in lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/
python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 291, in do_instruction 
request.instruction_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/
sdk_worker.py", line 317, in process_bundle bundle_processor.process_bundle(instruction_id)) File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 675, 
in process_bundle data.transform_id].process_encoded(data.data) File "/usr/local/lib/python3.7/
site-packages/apache_beam/runners/worker/bundle_processor.py", line 146, in process_encoded 
self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 258, in 
apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/
operations.py", line 259, in apache_beam.runners.worker.operations.Operation.output File 
"apache_beam/runners/worker/operations.py", line 146, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/
worker/operations.py", line 596, in apache_beam.runners.worker.operations.DoOperation.process File 
"apache_beam/runners/worker/operations.py", line 597, in 
apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", 
line 809, in apache_beam.runners.common.DoFnRunner.receive File "apache_beam/runners/common.py", 
line 815, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", 
line 882, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/
python3.7/site-packages/future/utils/init.py", line 421, in raise_with_traceback raise 
exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 813, in 
apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 449, in 
apache_beam.runners.common.SimpleInvoker.invoke_process File "/Users/username/repos/
dataflow-pipeline/venv/lib/python3.7/site-packages/apache_beam/transforms/core.py", line 1415, in 
wrapper = lambda x: [fn(x)] File 
"/Users/username/repos/dataflow-pipeline/pubsubtobigq.py", line 16, in parse_proto NameError: 
name 'MessageToDict' is not defined [while running 'generatedPtransform-23']

我使用protobuf_to_dict时出现错误消息

Error processing instruction -32. Original traceback is Traceback (most recent call last): File 
"apache_beam/runners/common.py", line 813, in apache_beam.runners.common.DoFnRunner.process File 
"apache_beam/runners/common.py", line 449, in 
apache_beam.runners.common.SimpleInvoker.invoke_process File "/Users/username/repos/
dataflow-pipeline/venv/lib/python3.7/site-packages/apache_beam/transforms/core.py", line 1415, in 
wrapper = lambda x: [fn(x)] File "/Users/username/repos/dataflow-pipeline/pubsubtobigq.py", line 
21, in parse_proto NameError: name 'protobuf_to_dict' is not defined During handling of the above 
exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/
python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 143, in _execute response 
= task() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", 
line 193, in lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/
python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 291, in do_instruction 
request.instruction_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/
sdk_worker.py", line 317, in process_bundle bundle_processor.process_bundle(instruction_id)) File 
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 675, 
in process_bundle data.transform_id].process_encoded(data.data) File "/usr/local/lib/python3.7/
site-packages/apache_beam/runners/worker/bundle_processor.py", line 146, in process_encoded 
self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 258, in 
apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/
operations.py", line 259, in apache_beam.runners.worker.operations.Operation.output File 
"apache_beam/runners/worker/operations.py", line 146, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/
worker/operations.py", line 596, in apache_beam.runners.worker.operations.DoOperation.process File 
"apache_beam/runners/worker/operations.py", line 597, in 
apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", 
line 809, in apache_beam.runners.common.DoFnRunner.receive File "apache_beam/runners/common.py", 
line 815, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", 
line 882, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/
python3.7/site-packages/future/utils/init.py", line 421, in raise_with_traceback raise 
exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 813, in 
apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 449, in 
apache_beam.runners.common.SimpleInvoker.invoke_process File "/Users/username/repos/
dataflow-pipeline/venv/lib/python3.7/site-packages/apache_beam/transforms/core.py", line 1415, in 
wrapper = lambda x: [fn(x)] File "/Users/username/repos/dataflow-pipeline/pubsubtobigq.py", line 
21, in parse_proto NameError: name 'protobuf_to_dict' is not defined [while running 
'generatedPtransform-22']

推荐答案

数据流工作进程无法查看全局依赖项https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors

根据quimiluzon@的建议,如果您的工作适用于DirectRunner,请尝试。如果是这样,这可能会起作用:


def parse_proto(message):
    from protobuf_to_dict import protobuf_to_dict
    dictinoary = protobuf_to_dict(message)

这篇关于如何为数据流安装PYTHON依赖项的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

相关文档推荐

Leetcode 234: Palindrome LinkedList(Leetcode 234:回文链接列表)
How do I read an Excel file directly from Dropbox#39;s API using pandas.read_excel()?(如何使用PANDAS.READ_EXCEL()直接从Dropbox的API读取Excel文件?)
subprocess.Popen tries to write to nonexistent pipe(子进程。打开尝试写入不存在的管道)
I want to realize Popen-code from Windows to Linux:(我想实现从Windows到Linux的POpen-code:)
Reading stdout from a subprocess in real time(实时读取子进程中的标准输出)
How to call type safely on a random file in Python?(如何在Python中安全地调用随机文件上的类型?)