如何使用多处理实现发布/订阅模式?

How can I implement a pub/sub pattern using multiprocessing?(如何使用多处理实现发布/订阅模式?)
本文介绍了如何使用多处理实现发布/订阅模式?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有没有办法使用multiprocessing数据结构创建发布/订阅模式?换句话说,我希望拥有类似队列的东西,只是发布者可以同时向多个工作进程发送单个命令。

推荐答案

您可以创建自己的数据结构,以使用multiprocessing.Queue包装器实现简单的发布/订阅模式:

import os
import multiprocessing
from functools import wraps


def ensure_parent(func):
    @wraps(func)
    def inner(self, *args, **kwargs):
        if os.getpid() != self._creator_pid:
            raise RuntimeError("{} can only be called in the "
                               "parent.".format(func.__name__))
        return func(self, *args, **kwargs)
    return inner

class PublishQueue(object):
    def __init__(self):
        self._queues = []
        self._creator_pid = os.getpid()

    def __getstate__(self):
        self_dict = self.__dict__
        self_dict['_queues'] = []
        return self_dict

    def __setstate__(self, state):
        self.__dict__.update(state)

    @ensure_parent
    def register(self):
        q = multiprocessing.Queue()
        self._queues.append(q)
        return q

    @ensure_parent
    def publish(self, val):
        for q in self._queues:
            q.put(val)

def worker(q):
    for item in iter(q.get, None):
        print("got item {} in process {}".format(item, os.getpid()))

if __name__ == "__main__":
    q = PublishQueue()
    processes = []
    for _ in range(3):
        p = multiprocessing.Process(target=worker, args=(q.register(),))
        p.start()
        processes.append(p)
    q.publish('1')
    q.publish(2)
    q.publish(None)  # Shut down workers

    for p in processes:
        p.join()

输出:

got item 1 in process 4383
got item 2 in process 4383
got item 1 in process 4381
got item 2 in process 4381
got item 1 in process 4382
got item 2 in process 4382
只要父进程是唯一执行发布的进程,并且为父进程中的每个工作进程注册一个订阅队列,然后使用其multiprocessing.Process构造函数将订阅队列传递给工作进程,则此模式就可以很好地工作。这些限制是由于multiprocessing.Queue不可拾取。如果需要将订阅队列传递给已在运行的工作进程,则需要调整实现以使用multiprocessing.Manager.Queue

这篇关于如何使用多处理实现发布/订阅模式?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!

相关文档推荐

Leetcode 234: Palindrome LinkedList(Leetcode 234:回文链接列表)
How do I read an Excel file directly from Dropbox#39;s API using pandas.read_excel()?(如何使用PANDAS.READ_EXCEL()直接从Dropbox的API读取Excel文件?)
subprocess.Popen tries to write to nonexistent pipe(子进程。打开尝试写入不存在的管道)
I want to realize Popen-code from Windows to Linux:(我想实现从Windows到Linux的POpen-code:)
Reading stdout from a subprocess in real time(实时读取子进程中的标准输出)
How to call type safely on a random file in Python?(如何在Python中安全地调用随机文件上的类型?)