本文介绍了在一个单独的线程中为所有连接无限期地生成内容?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有一个Twisted项目,该项目寻求在JSON中通过TCP重新广播收集的数据。我实际上有一个USB库,我需要订阅它并在While循环中无限期地同步读取,如下所示:
while True:
for line in usbDevice.streamData():
data = MyBrandSpankingNewUSBDeviceData(line)
# parse the data, convert to JSON
output = convertDataToJSON(data)
# broadcast the data
...
当然,问题在于...
。本质上,我需要在服务器启动时立即启动该进程,并在服务器结束时结束它(Protocol.doStart
和Protocol.doStop
),并使其不断运行并向每个连接的传输广播output
。
如何在Twisted中执行此操作?显然,我需要让While循环在它自己的线程中运行,但是我如何"订阅"客户端来监听输出呢?USB数据收集只运行一次也很重要,因为它多次运行可能会严重扰乱事情。
简而言之,我的架构如下:
- 服务器有一个USB集线器,该集线器一直在传输数据。服务器不断订阅此USB集线器并不断读取数据。
- 客户端来来去去,可以随意连接和断开。
推荐答案
您可能想要做的一件事是尝试扩展公共协议/传输独立性。即使您需要一个具有长时间运行循环的线程,您也可以对协议隐藏这一点。好处与往常一样:协议变得更易于测试,如果您曾经设法拥有读取USB事件的非线程实现,您只需更改传输,而无需更改协议。
from threading import Thread
class USBThingy(Thread):
def __init__(self, reactor, device, protocol):
self._reactor = reactor
self._device = device
self._protocol = protocol
def run(self):
while True:
for line in self._device.streamData():
self._reactor.callFromThread(self._protocol.usbStreamLineReceived, line)
<2-4]>的使用是此解决方案可用的一部分。它确保usbStreamLineReceived
方法在反应线程中调用,而不是在从USB设备读取的线程中调用。因此,从协议对象的角度来看,线程化并没有什么特别之处:它只是在有数据要处理时偶尔调用它的方法。
然后,您的协议只需以某种方式实现usbStreamLineReceived
,并实现其他特定于应用程序的逻辑,如保存观察者列表:
class SomeUSBProtocol(object):
def __init__(self):
self.observers = []
def usbStreamLineReceived(self, line):
data = MyBrandSpankingNewUSBDeviceData(line)
# broadcast the data
for obs in self.observers[:]:
obs(output)
然后观察者可以向此类的实例注册自己,并对数据执行任何他们想要的操作:
class USBObserverThing(Protocol):
def connectionMade(self):
self.factory.usbProto.observers.append(self.emit)
def connectionLost(self):
self.factory.usbProto.observers.remove(self.emit)
def emit(self, output):
# parse the data, convert to JSON
output = convertDataToJSON(data)
self.transport.write(output)
全部连接在一起:
usbDevice = ...
usbProto = SomeUSBProtocol()
thingy = USBThingy(reactor, usbDevice, usbProto)
thingy.start()
factory = ServerFactory()
factory.protocol = USBObserverThing
factory.usbProto = usbProto
reactor.listenTCP(12345, factory)
reactor.run()
您可以想象一个更好的观察者注册/注销API(就像使用实际方法而不是直接访问该列表)。您还可以想象为USBThingy
提供一个关闭方法,以便SomeUSBProtocol
可以控制它何时停止运行(这样您的进程实际上将能够退出)。
这篇关于在一个单独的线程中为所有连接无限期地生成内容?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!