问题描述
我正在编写并行运行实验的代码。我无法控制实验的操作,它们可能会使用subprocess.Popen
或check_output
打开以运行一个或多个附加子进程。
我有两个条件:我希望能够终止超过超时的实验,并且我希望在KeyboardInterrupt
上终止实验。
大多数终止进程的方法都不能确保终止所有子进程等。如果一个接一个地运行100个实验,这显然是一个问题,但它们都会派生在超时发生后仍然存活的子进程,并且实验应该被终止。
我现在处理这一问题的方法是包括将实验配置存储在数据库中的代码,生成从命令行加载和运行实验的代码,然后通过subprocess.Popen(cmd, shell=True, start_new_session=True)
调用这些命令,并使用os.killpg
on Timeout终止它们。
我的主要问题是:通过命令行调用这些实验感觉很麻烦,那么有没有办法直接通过multiprocessing.Process(target=fn)
调用代码,并在超时时达到start_new_session=True
+os.killpg
的效果和KeyboardInterrupt
?
<file1>
def run_exp(config):
do work
return result
if __name__ == "__main__":
save_exp(run_exp(load_config(sys.args)))
<file2>
def monitor(queue):
active = set() # active process ids
while True:
msg = queue.get()
if msg == "sentinel":
<loop over active ids and kill them with os.killpg>
else:
<add or remove id from active set>
def worker(args):
id, queue = args
command = f"python <file1> {id}"
with subprocess.Popen(command, shell=True, ..., start_new_session=True) as process:
try:
queue.put(f"start {process.pid}")
process.communicate(timeout=timeout)
except TimeoutExpired:
os.killpg(process.pid, signal.SIGINT) # send signal to the process group
process.communicate()
finally:
queue.put(f"done {process.pid}")
def main():
<save configs => c_ids>
queue = manager.Queue()
process = Process(target=monitor, args=(queue,))
process.start()
def clean_exit():
queue.put("sentinel")
<terminate pool and monitor process>
r = pool.map_async(worker, [(c_id, queue) for c_id in c_ids])
atexit.register(clean_exit)
r.wait()
<terminate pool and monitor process>
我发布了代码框架,详细介绍了通过命令行启动进程并终止它们的方法。我的方法的那个版本的另一个复杂性是,当KeyboardInterrupt
到达时,队列已经终止(因为没有更好的词),并且不可能与监视进程通信(哨兵消息永远不会到达)。相反,我不得不将进程ID写入文件,并在主进程中读回该文件,以终止仍在运行的进程。如果您知道解决此队列问题的方法,我很想了解一下。
推荐答案
我认为问题是您正在存储子进程ID以终止它,您需要主机进程ID,而您使用了signal.SIGINT
,我认为应该是signal.SIGTERM
。试试这个,而不是这一行:
os.killpg(process.pid, signal.SIGINT)
使用此行:
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
这篇关于管理和终止任何进程的可靠方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!