同时运行具有多个参数的函数并聚合复杂结果

Running Functions with Multiple Arguments Concurrently and Aggregating Complex Results(同时运行具有多个参数的函数并聚合复杂结果)
本文介绍了同时运行具有多个参数的函数并聚合复杂结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

设置

这是我发布的有关访问多个进程的结果的问题的第二部分。
有关第一部分,请单击此处:Link to Part One

我有一个复杂的数据集,需要同时与各种约束集进行比较,但我遇到了多个问题。第一个问题是从我的多个进程中获得结果,第二个问题是使任何东西都不能只是一个极其简单的函数来并发运行。

示例

我有多个约束集需要与某些数据进行比较,我希望同时执行此操作,因为我有很多约束集。在本例中,我将只使用两组约束。

Jupyter笔记本

创建一些约束和数据示例

# Create a set of constraints
constraints = pd.DataFrame([['2x2x2', 2,2,2],['5x5x5',5,5,5],['7x7x7',7,7,7]],
                     columns=['Name','First', 'Second', 'Third'])
constraints.set_index('Name', inplace=True)

# Create a second set of constraints
constraints2 = pd.DataFrame([['4x4x4', 4,4,4],['6x6x6',6,6,6],['7x7x7',7,7,7]],
                      columns=['Name','First', 'Second', 'Third'])
constraints2.set_index('Name', inplace=True)

# Create some sample data
items = pd.DataFrame([['a', 2,8,2],['b',5,3,5],['c',7,4,7]], columns=['Name','First', 'Second', 'Third'])
items.set_index('Name', inplace=True)

按顺序运行

如果我按顺序运行它,我可以得到我想要的结果,但对于我实际处理的数据,它可能需要12个小时以上。下面是按顺序运行的结果,这样您就可以知道我想要的结果是什么。

# Function
def seq_check_constraint(df_constraints_input, df_items_input):
    df_constraints = df_constraints_input.copy()
    df_items = df_items_input.copy()
    
    df_items['Product'] = df_items.product(axis=1)
    df_constraints['Product'] = df_constraints.product(axis=1)
    
    for constraint in df_constraints.index:
        df_items[constraint+'Product'] = df_constraints.loc[constraint,'Product']
        
    for constraint in df_constraints.index:
        for item in df_items.index:
                col_name = constraint+'_fits'
                df_items[col_name] = False
                df_items.loc[df_items['Product'] < df_items[constraint+'Product'], col_name] = True
    
    df_res = df_items.iloc[:: ,7:]
    return df_res
constraint_sets = [constraints, constraints2, ...]
results = {}
counter = 0

for df in constrain_sets:
    res = seq_check_constraint(df, items)
    results['constraints'+str(counter)] = res

或更丑:

df_res1 = seq_check_constraint(constraints, items)
df_res2 = seq_check_constraint(constraints2, items)

results = {'constraints0':df_res1, 'constraints1': df_res2}
作为按顺序运行这些命令的结果,我最终得到如下所示的:

我最终希望得到DataFrame的词典或列表,或者能够将DataFrame全部追加在一起。我获得结果的顺序对我来说并不重要,我只希望将它们全部放在一起,并需要能够对它们进行进一步分析。

我尝试的内容

这就引出了我对多处理的尝试,据我所知,您可以使用队列或管理器来处理共享数据和内存,但我两者都不能工作。我还在努力获取我的函数,该函数接受两个参数在Pool中执行。

以下是我的代码,目前使用的是上面的相同示例数据:

函数

def check_constraint(df_constraints_input, df_items_input):
    df_constraints = df_constraints_input.copy()
    df_items = df_items_input.copy()
    
    df_items['Product'] = df_items.product(axis=1)  # Mathematical Product
    df_constraints['Product'] = df_constraints.product(axis=1)
    
    for constraint in df_constraints.index:
        df_items[constraint+'Product'] = df_constraints.loc[constraint,'Product']
        
    for constraint in df_constraints.index:
        for item in df_items.index:
                col_name = constraint+'_fits'
                df_items[col_name] = False
                df_items.loc[df_items['Product'] < df_items[constraint+'Product'], col_name] = True
    
    df_res = df_items.iloc[:: ,7:]
    return df_res

Jupyter笔记本

df_manager = mp.Manager()
df_ns = df_manager.Namespace()
df_ns.constraint_sets = [constraints, constraints2]


print('---Starting pool---')

if __name__ == '__main__':
    with mp.Pool() as p:
        print('--In the pool--')
        res = p.map_async(mpf.check_constraint, (df_ns.constraint_sets, itertools.repeat(items)))
        print(res.get())

和我当前的错误:

TypeError: check_constraint() missing 1 required positional argument: 'df_items_input'

推荐答案

最简单的方法是创建一个元组列表(其中一个元组表示函数的一组参数)并将其传递给starmap

df_manager = mp.Manager()
df_ns = df_manager.Namespace()
df_ns.constraint_sets = [constraints, constraints2]


print('---Starting pool---')

if __name__ == '__main__':
    with mp.Pool() as p:
        print('--In the pool--')
        check_constraint_args = []
        for constraint in constraint_sets:
            check_constraint_args.append((constraint, items))
        res = p.starmap(mpf.check_constraint, check_constraint_args)
        print(res.get())

这篇关于同时运行具有多个参数的函数并聚合复杂结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

相关文档推荐

Leetcode 234: Palindrome LinkedList(Leetcode 234:回文链接列表)
How do I read an Excel file directly from Dropbox#39;s API using pandas.read_excel()?(如何使用PANDAS.READ_EXCEL()直接从Dropbox的API读取Excel文件?)
subprocess.Popen tries to write to nonexistent pipe(子进程。打开尝试写入不存在的管道)
I want to realize Popen-code from Windows to Linux:(我想实现从Windows到Linux的POpen-code:)
Reading stdout from a subprocess in real time(实时读取子进程中的标准输出)
How to call type safely on a random file in Python?(如何在Python中安全地调用随机文件上的类型?)