本文介绍了多处理未派生所有请求的进程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在使用Oraflex(一个用于离岸分析的有限元软件,但不应该是相关的)。我创建了一个脚本来检查我执行的模拟是否已成功完成(模拟可能会因为没有达到收敛而失败)。因为我谈论的是数千个文件,所以我尝试使用multiprocessing
并行化该进程。下面是我的代码。很抱歉,我不能为您提供一个有效的示例,但我会尝试详细解释。我创建了multiprocessing.Process
的派生类,并覆盖了run()
以执行对模拟文件的检查。
然后,在__main__
中,我设置了一些处理器,相应地拆分文件,并开始执行。
问题是这些进程并没有完全派生,而是在一个进程到另一个进程的随机时间量内产生。这是理所当然的吗,还是我错过了什么? 我不完全产卵的意思是,我看到:
[Info/Worker-1] child process calling self.run()
例如:
[Info/Worker-4] child process calling self.run()
程序运行约10分钟后。
事先感谢您的帮助/建议。
import os
import subprocess
import glob
import multiprocessing
import logging
import sys
import OrcFxAPI as of
class Worker(multiprocessing.Process):
myJobs = []
def setJobs(self, jobList):
self.myJobs = jobList
@staticmethod
def changedExtensionFileName(oldFileName, newExtension):
return '.'.join((os.path.splitext(oldFileName)[0], newExtension))
def run(self):
failed = []
model = of.Model(threadCount=1)
for job in self.myJobs:
try:
print('%s starting' % job)
sys.stdout.flush()
model.LoadSimulation(job)
if model.state == of.ModelState.SimulationStoppedUnstable:
newJob = job.replace('.sim', '.dat')
failed.append(newJob)
with open('Failed_Sim.txt', 'a') as f:
f.write(f'{newJob}
')
f.close()
model.LoadData(newJob)
model.general.ImplicitConstantTimeStep /= 2
model.SaveData(newJob)
print(f'{job} has failed, reducing time step')
except of.DLLError as err:
print('%s ERROR: %s' % (job, err))
sys.stdout.flush()
with open(self.changedExtensionFileName(job, 'FAIL'), 'w') as f:
f.write('%s error: %s' % (job, err))
f.close()
return
if __name__ == '__main__':
import re
sim_file = [f for f in os.listdir() if re.search(r'dddd.*.sim', f)]
# begin multprocessing
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
corecount = 14
workers = []
chunkSize = int(len(sim_file) / corecount)
chunkRemainder = int(len(sim_file) % corecount)
print('%s jobs found, dividing across %s workers - %s each remainder %s' % (str(len(sim_file)), str(corecount), chunkSize, chunkRemainder))
start = 0
for coreNum in range(0, corecount):
worker = Worker()
workers.append(worker)
end = start + chunkSize
if chunkRemainder>0:
chunkRemainder -= 1
end += 1
if end>len(sim_file):
end = len(sim_file)
worker.setJobs(sim_file[start:end])
worker.start()
start = end
if start>=len(sim_file):
break
for worker in workers:
worker.join()
print('Done...')
推荐答案
好的,所以没有人举手来回答这个问题(我不知道怎么做!),所以来了一个更大的重组建议...
def worker(inpData):
#The worker process
failed1 = []
failed2 = []
for job in inpData: #I'm not sure of the data shape of the chunks, has your original method split them into coherent chunks capable of being processed independently? My step here could be wrong.
try:
#print('%s starting' % job) #Prints won't appear on console from worker processes from windows, so commented them all out
model.LoadSimulation(job)
if model.state == of.ModelState.SimulationStoppedUnstable:
newJob = job.replace('.sim', '.dat')
failed1.append(newJob)
#I'd recommend we pass the list "failed" back to master and write to text from there, otherwise you could have several processes updating the text file at once, leading to possible loss of data
#with open('Failed_Sim.txt', 'a') as f:
# f.write(f'{newJob}
')
# f.close()
model.LoadData(newJob)
model.general.ImplicitConstantTimeStep /= 2
model.SaveData(newJob)
#print(f'{job} has failed, reducing time step')
except of.DLLError as err:
#print('%s ERROR: %s' % (job, err))
#sys.stdout.flush()
#with open(self.changedExtensionFileName(job, 'FAIL'), 'w') as f:
# f.write('%s error: %s' % (job, err))
# f.close()
failed2.append(job)
#Note I've made two failed lists to pass back, for both failure types
return failed1, failed2
if __name__ == "__main__":
import re
import multiprocessing as mp
nCPUs = mp.cpu_count()
sim_file = [f for f in os.listdir() if re.search(r'dddd.*.sim', f)]
#Make the chunks
chunkSize = int(len(sim_file) / corecount)
chunkRemainder = int(len(sim_file) % corecount)
print('%s jobs found, dividing across %s workers - %s each remainder %s' % (str(len(sim_file)), str(corecount), chunkSize, chunkRemainder))
chunks = []
start = 0
for iChunk in range(0, nCPUs)
end = start + chunkSize
if chunkRemainder>0:
chunkRemainder -= 1
end += 1
if end>len(sim_file):
end = len(sim_file)
chunk.append(sim_file[start:end])
#Send to workers
pool = mp.Pool(processes=nCPUs)
futA = []
for iChunk in range(0, nCPUs):
futA.append(pool.apply_async(worker, args=(chunk[iChunk],))
#Gather results
if futA:
failedDat = []
failedSim = []
for iChunk in range(0, len(futA)):
resA, resB = futA[iChunk].get()
failedDat.extend(resA)
failedSim.extend(resB)
pool.close()
if failedDat:
print("Following jobs failed, reducing timesteps:")
print(failedDat)
if failedSim:
print("Following sims failed due to errors")
print(failedSim)
这篇关于多处理未派生所有请求的进程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!