正在使用--preload初始化DaskWorker中的全局任务模块?

Initializing task module global in dask worker using --preload?(正在使用--preload初始化DaskWorker中的全局任务模块?)
本文介绍了正在使用--preload初始化DaskWorker中的全局任务模块?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图实现类似于这些问题(Initializing state on dask-distributed workers,Setting up Dask worker with variable)的内容,其中我有一个(相对)大的模型,我希望在接受需要该模型的任务的工作线程子集上预初始化该模型。理想情况下,我甚至不希望客户端计算机具有该模型。

在发现这些问题之前,我最初的尝试是在共享模块worker_task.model中定义delayed任务,并在工作程序的--preload脚本中为该任务分配一个模块全局变量(例如worker_tasks.model.model)以供该任务使用;然而,由于某种原因,这并不起作用-该变量在预加载脚本中设置,但在调用该任务时仍为None

init_Model_worker.py:

import logging
from uuid import uuid4

from worker_tasks import model


def dask_setup(worker):
    model.model = f'<mock model {uuid4()}>'

    logger = logging.getLogger('distributed')
    logger.warning(f'model = {model.model}')

worker_tasks/model.py:

import logging
import random
from time import sleep
from uuid import uuid4

import dask

model = None


@dask.delayed
def compute_clinical(inp):        
    if model is None:
        raise RuntimeError('Model not initialized.')

    sleep(random.uniform(3, 17))

    return {
        'result': random.choice((True, False)),
        'confidence': random.uniform(0, 1)
        }

这是我启动它并将某些内容提交给计划程序时的工作日志:

> dask-worker --preload init_model_worker.py tcp://scheduler:8786 --name model-worker
distributed.utils - INFO - Reload module init_model_worker from .py file                                  
distributed.nanny - INFO -         Start Nanny at: 'tcp://172.28.0.4:41743'                         
distributed.diskutils - INFO - Found stale lock file and directory '/worker-epptq9sh', purging      
distributed.utils - INFO - Reload module init_model_worker from .py file                                  
distributed - WARNING - model = <mock model faa41af0-d925-46ef-91c9-086093d37c71>                   
distributed.worker - INFO -       Start worker at:     tcp://172.28.0.4:37973                       
distributed.worker - INFO -          Listening to:     tcp://172.28.0.4:37973                       
distributed.worker - INFO -              nanny at:           172.28.0.4:41743                       
distributed.worker - INFO -              bokeh at:           172.28.0.4:37766                       
distributed.worker - INFO - Waiting to connect to:       tcp://scheduler:8786                       
distributed.worker - INFO - -------------------------------------------------                       
distributed.worker - INFO -               Threads:                          4                       
distributed.worker - INFO -                Memory:                    1.93 GB                       
distributed.worker - INFO -       Local Directory:           /worker-mhozo9ru                       
distributed.worker - INFO - -------------------------------------------------                       
distributed.worker - INFO -         Registered to:       tcp://scheduler:8786                       
distributed.worker - INFO - -------------------------------------------------                       
distributed.core - INFO - Starting established connection                                           
distributed.worker - WARNING -  Compute Failed                                                      
Function:  compute_clinical                                                                         
args:      ('mock')                                                                                 
kwargs:    {}                                                                                       
Exception: RuntimeError('Model not initialized.')                                                   

您可以看到,重新加载预加载脚本后,model<mock model faa41af0-d925-46ef-91c9-086093d37c71>;但当我尝试从任务中调用它时,得到None

我将尝试根据对其他问题的回答来实施解决方案,但我有几个与Worker预加载相关的问题:

  1. 为什么在预加载脚本中分配任务后,调用任务时模型None会出现?
  2. 是否一般建议避免在Worker--preload脚本中执行此类操作?从客户端调用工作进程状态的初始化是否更好?如果是,为什么

推荐答案

我怀疑模型变量会立即绑定到您的函数中,但是它会序列化函数。您可以尝试执行以下操作:

@dask.delayed
def compute_clinical(inp):       
    from worker_tasks.model import model

    if model is None:
        raise RuntimeError('Model not initialized.')

或者,与其将变量分配给全局模块作用域(这在Python中可能很难理解),不如尝试将其分配给Worker本身。

from dask.distributed import get_worker

def dask_setup(worker):
    worker.model = f'<mock model {uuid4()}>'

@dask.delayed
def compute_clinical(inp):       
    if get_worker().model is None:
        raise RuntimeError('Model not initialized.')

这篇关于正在使用--preload初始化DaskWorker中的全局任务模块?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

相关文档推荐

Initialising MSEdge Browser in python, getting TypeError: Level not an integer or a valid string: None(正在使用Python初始化MSEdge浏览器,获取TypeError:Level不是整数或有效字符串:无)
How to handle initializer error in multiprocessing.Pool?(如何处理多进程.Pool中的初始化器错误?)
How do you initialize a global variable only when its not defined?(如何仅在全局变量未定义时才对其进行初始化?)
What is the suitable value to initialize an empty column of type geometry(初始化类型为GEOMETRY的空列的合适值是多少)
Initialize high dimensional sparse matrix(初始化高维稀疏矩阵)
python mock default init argument of class(Python模拟类的默认初始化参数)