多处理未派生所有请求的进程

Multiprocessing not spawning all the requested processes(多处理未派生所有请求的进程)
本文介绍了多处理未派生所有请求的进程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Oraflex(一个用于离岸分析的有限元软件,但不应该是相关的)。我创建了一个脚本来检查我执行的模拟是否已成功完成(模拟可能会因为没有达到收敛而失败)。因为我谈论的是数千个文件,所以我尝试使用multiprocessing并行化该进程。下面是我的代码。很抱歉,我不能为您提供一个有效的示例,但我会尝试详细解释。我创建了multiprocessing.Process的派生类,并覆盖了run()以执行对模拟文件的检查。 然后,在__main__中,我设置了一些处理器,相应地拆分文件,并开始执行。

问题是这些进程并没有完全派生,而是在一个进程到另一个进程的随机时间量内产生。这是理所当然的吗,还是我错过了什么? 我不完全产卵的意思是,我看到:

[Info/Worker-1] child process calling self.run()

例如:

[Info/Worker-4] child process calling self.run()

程序运行约10分钟后。

事先感谢您的帮助/建议。

import os
import subprocess
import glob
import multiprocessing
import logging
import sys
import OrcFxAPI as of

class Worker(multiprocessing.Process):

    myJobs = []

    def setJobs(self, jobList):
        self.myJobs = jobList

    @staticmethod
    def changedExtensionFileName(oldFileName, newExtension):
        return '.'.join((os.path.splitext(oldFileName)[0], newExtension))

    def run(self):
        failed = []
        model = of.Model(threadCount=1)

        for job in self.myJobs:
            try:
                print('%s starting' % job)
                sys.stdout.flush()
                model.LoadSimulation(job)
                if model.state == of.ModelState.SimulationStoppedUnstable:
                    newJob = job.replace('.sim', '.dat')
                    failed.append(newJob)

                    with open('Failed_Sim.txt', 'a') as f:
                        f.write(f'{newJob}
')
                        f.close()

                    model.LoadData(newJob)
                    model.general.ImplicitConstantTimeStep /= 2
                    model.SaveData(newJob)
                    print(f'{job} has failed, reducing time step')

            except of.DLLError as err:
                print('%s ERROR: %s' % (job, err))
                sys.stdout.flush()
                with open(self.changedExtensionFileName(job, 'FAIL'), 'w') as f:
                    f.write('%s error: %s' % (job, err))
                    f.close()
        return



if __name__ == '__main__':
    import re
    sim_file = [f for f in os.listdir() if re.search(r'dddd.*.sim', f)]    

    # begin multprocessing
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)

    corecount = 14 

    workers = []

    chunkSize = int(len(sim_file) / corecount)
    chunkRemainder = int(len(sim_file) % corecount)
    print('%s jobs found, dividing across %s workers - %s each remainder %s' % (str(len(sim_file)), str(corecount), chunkSize, chunkRemainder))

    start = 0
    for coreNum in range(0, corecount):
        worker = Worker()
        workers.append(worker)
        end = start + chunkSize
        if chunkRemainder>0:
            chunkRemainder -= 1
            end += 1
        if end>len(sim_file):
            end = len(sim_file)
        worker.setJobs(sim_file[start:end])
        worker.start()
        start = end
        if start>=len(sim_file):
            break

    for worker in workers:
        worker.join()
    print('Done...')

推荐答案

好的,所以没有人举手来回答这个问题(我不知道怎么做!),所以来了一个更大的重组建议...

def worker(inpData):
    #The worker process

    failed1 = []
    failed2 = []

    for job in inpData:   #I'm not sure of the data shape of the chunks, has your original method split them into coherent chunks capable of being processed independently? My step here could be wrong. 
        try:
            #print('%s starting' % job)  #Prints won't appear on console from worker processes from windows, so commented them all out
            model.LoadSimulation(job)
            if model.state == of.ModelState.SimulationStoppedUnstable:
                newJob = job.replace('.sim', '.dat')
                failed1.append(newJob)

                #I'd recommend we pass the list "failed" back to master and write to text from there, otherwise you could have several processes updating the text file at once, leading to possible loss of data
                #with open('Failed_Sim.txt', 'a') as f:
                #     f.write(f'{newJob}
')
                #     f.close()

                model.LoadData(newJob)
                model.general.ImplicitConstantTimeStep /= 2
                model.SaveData(newJob)
                #print(f'{job} has failed, reducing time step')   

            except of.DLLError as err:
                #print('%s ERROR: %s' % (job, err))
                #sys.stdout.flush()
                #with open(self.changedExtensionFileName(job, 'FAIL'), 'w') as f:
                #    f.write('%s error: %s' % (job, err))
                #    f.close()
                failed2.append(job)

#Note I've made two failed lists to pass back, for both failure types

return failed1, failed2


if __name__ == "__main__":
    import re
    import multiprocessing as mp
    nCPUs = mp.cpu_count()

    sim_file = [f for f in os.listdir() if re.search(r'dddd.*.sim', f)] 

    #Make the chunks
    chunkSize = int(len(sim_file) / corecount)
    chunkRemainder = int(len(sim_file) % corecount)
    print('%s jobs found, dividing across %s workers - %s each remainder %s' % (str(len(sim_file)), str(corecount), chunkSize, chunkRemainder))

    chunks = []
    start = 0
    for iChunk in range(0, nCPUs)
        end = start + chunkSize
        if chunkRemainder>0:
            chunkRemainder -= 1
            end += 1
            if end>len(sim_file):
                end = len(sim_file)
        chunk.append(sim_file[start:end])


    #Send to workers
    pool = mp.Pool(processes=nCPUs)
    futA = []

    for iChunk in range(0, nCPUs):
        futA.append(pool.apply_async(worker, args=(chunk[iChunk],))
    

    #Gather results
    if futA:
        failedDat = []
        failedSim = []
        for iChunk in range(0, len(futA)):
            resA, resB = futA[iChunk].get()
            failedDat.extend(resA)
            failedSim.extend(resB)
    pool.close()
            
    if failedDat:
        print("Following jobs failed, reducing timesteps:")
        print(failedDat)
    if failedSim:
        print("Following sims failed due to errors")
        print(failedSim) 

这篇关于多处理未派生所有请求的进程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

相关文档推荐

Leetcode 234: Palindrome LinkedList(Leetcode 234:回文链接列表)
How do I read an Excel file directly from Dropbox#39;s API using pandas.read_excel()?(如何使用PANDAS.READ_EXCEL()直接从Dropbox的API读取Excel文件?)
subprocess.Popen tries to write to nonexistent pipe(子进程。打开尝试写入不存在的管道)
I want to realize Popen-code from Windows to Linux:(我想实现从Windows到Linux的POpen-code:)
Reading stdout from a subprocess in real time(实时读取子进程中的标准输出)
How to call type safely on a random file in Python?(如何在Python中安全地调用随机文件上的类型?)