问题描述
首先介绍一下上下文:我正在学习 C++11 中的线程,为此,我正在尝试构建一个小的 actor
类,本质上是这样的(我把异常处理和传播的东西排除在外):
First a little context: I'm in the process of learning about threading in C++11 and for this purpose, I'm trying to build a small actor
class, essentially (I left the exception handling and propagation stuff out) like so:
class actor {
private: std::atomic<bool> stop;
private: std::condition_variable interrupt;
private: std::thread actor_thread;
private: message_queue incoming_msgs;
public: actor()
: stop(false),
actor_thread([&]{ run_actor(); })
{}
public: virtual ~actor() {
// if the actor is destroyed, we must ensure the thread dies too
stop = true;
// to this end, we have to interrupt the actor thread which is most probably
// waiting on the incoming_msgs queue:
interrupt.notify_all();
actor_thread.join();
}
private: virtual void run_actor() {
try {
while(!stop)
// wait for new message and process it
// but interrupt the waiting process if interrupt is signaled:
process(incoming_msgs.wait_and_pop(interrupt));
}
catch(interrupted_exception) {
// ...
}
};
private: virtual void process(const message&) = 0;
// ...
};
每个参与者都在自己的 actor_thread
中运行,在 incoming_msgs
上等待新的传入消息,并在消息到达时对其进行处理.
Every actor runs in its own actor_thread
, waits on a new incoming message on incoming_msgs
and -- when a message arrives -- processes it.
actor_thread
是与 actor
一起创建的,并且必须与它一起死亡,这就是为什么我需要在 message_queue 中使用某种中断机制::wait_and_pop(std::condition_variable 中断)
.
The actor_thread
is created together with the actor
and has to die together with it, which is why I need some kind of interrupt mechanism in the message_queue::wait_and_pop(std::condition_variable interrupt)
.
基本上,我要求 wait_and_pop
阻塞,直到a) 一个新的 message
到达或b) 直到 interrupt
被触发,在这种情况下——理想情况下——将抛出一个 interrupted_exception
.
Essentially, I require that wait_and_pop
blocks until either
a) a new message
arrives or
b) until the interrupt
is fired, in which case -- ideally -- an interrupted_exception
is to be thrown.
message_queue
中新消息的到达目前也由 std::condition_variable new_msg_notification
建模:
The arrival of a new message in the message_queue
is presently modeled also by a std::condition_variable new_msg_notification
:
// ...
// in class message_queue:
message wait_and_pop(std::condition_variable& interrupt) {
std::unique_lock<std::mutex> lock(mutex);
// How to interrupt the following, when interrupt fires??
new_msg_notification.wait(lock,[&]{
return !queue.empty();
});
auto msg(std::move(queue.front()));
queue.pop();
return msg;
}
长话短说,问题是这样的:如何在 new_msg_notification.wait(...)
interrupt
被触发(不引入超时)?
To cut the long story short, the question is this: How do I interrupt the waiting for a new message in new_msg_notification.wait(...)
when the interrupt
is triggered (without introducing a time-out)?
或者,问题可以理解为:我如何等待两个 std::condition_variable
中的任何一个发出信号?
Alternatively, the question may be read as: How do I wait until any one of two std::condition_variable
s are signaled?
一种天真的方法似乎是根本不使用 std::condition_variable
作为中断,而只使用原子标志 std::atomic<bool>中断
,然后忙于等待new_msg_notification
,超时时间非常短,直到有新消息到达或直到true==interrupted
.但是,我非常希望避免忙于等待.
One naive approach seems to be not to use std::condition_variable
at all for the interrupt and instead just use an atomic flag std::atomic<bool> interrupted
and then busy wait on new_msg_notification
with a very small time-out until either a new message has arrived or until true==interrupted
. However, I would very much like to avoid busy waiting.
从 pilcrow 的评论和回答来看,基本上有两种可能的方法.
From the comments and the answer by pilcrow, it looks like there are basically two approaches possible.
- 根据 Alan、mukunda 和 pilcrow 的建议,将特殊的终止"消息排入队列.我决定反对这个选项,因为我不知道我希望演员终止时队列的大小.很有可能(当我想要快速终止某些东西时,大多数情况下)队列中有数千条消息要处理,并且等待它们被处理直到最终终止消息得到它似乎是不可接受的转.
- 实现条件变量的自定义版本,通过将通知转发到第一个线程正在等待的条件变量,可能会被另一个线程中断.我选择了这种方法.
对于那些感兴趣的人,我的实现如下.在我的例子中,条件变量实际上是一个 semaphore
(因为我更喜欢它们,也因为我喜欢这样做的练习).我为这个信号量配备了一个相关的interrupt
,它可以通过semaphore::get_interrupt()
从信号量中获得.如果现在一个线程阻塞在 semaphore::wait()
中,另一个线程有可能在信号量中断时调用 semaphore::interrupt::trigger()
,导致第一个线程解除阻塞并传播 interrupt_exception
.
For those of you interested, my implementation goes as follows. The condition variable in my case is actually a semaphore
(because I like them more and because I liked the exercise of doing so). I equipped this semaphore with an associated interrupt
which can be obtained from the semaphore via semaphore::get_interrupt()
. If now one thread blocks in semaphore::wait()
, another thread has the possibility to call semaphore::interrupt::trigger()
on the interrupt of the semaphore, causing the first thread to unblock and propagate an interrupt_exception
.
struct
interrupt_exception {};
class
semaphore {
public: class interrupt;
private: mutable std::mutex mutex;
// must be declared after our mutex due to construction order!
private: interrupt* informed_by;
private: std::atomic<long> counter;
private: std::condition_variable cond;
public:
semaphore();
public:
~semaphore() throw();
public: void
wait();
public: interrupt&
get_interrupt() const { return *informed_by; }
public: void
post() {
std::lock_guard<std::mutex> lock(mutex);
counter++;
cond.notify_one(); // never throws
}
public: unsigned long
load () const {
return counter.load();
}
};
class
semaphore::interrupt {
private: semaphore *forward_posts_to;
private: std::atomic<bool> triggered;
public:
interrupt(semaphore *forward_posts_to) : triggered(false), forward_posts_to(forward_posts_to) {
assert(forward_posts_to);
std::lock_guard<std::mutex> lock(forward_posts_to->mutex);
forward_posts_to->informed_by = this;
}
public: void
trigger() {
assert(forward_posts_to);
std::lock_guard<std::mutex>(forward_posts_to->mutex);
triggered = true;
forward_posts_to->cond.notify_one(); // never throws
}
public: bool
is_triggered () const throw() {
return triggered.load();
}
public: void
reset () throw() {
return triggered.store(false);
}
};
semaphore::semaphore() : counter(0L), informed_by(new interrupt(this)) {}
// must be declared here because otherwise semaphore::interrupt is an incomplete type
semaphore::~semaphore() throw() {
delete informed_by;
}
void
semaphore::wait() {
std::unique_lock<std::mutex> lock(mutex);
if(0L==counter) {
cond.wait(lock,[&]{
if(informed_by->is_triggered())
throw interrupt_exception();
return counter>0;
});
}
counter--;
}
使用这个 semaphore
,我的消息队列实现现在看起来像这样(使用信号量而不是 std::condition_variable
我可以摆脱 std::互斥体
:
Using this semaphore
, my message queue implementation now looks like this (using the semaphore instead of the std::condition_variable
I could get rid of the std::mutex
:
class
message_queue {
private: std::queue<message> queue;
private: semaphore new_msg_notification;
public: void
push(message&& msg) {
queue.push(std::move(msg));
new_msg_notification.post();
}
public: const message
wait_and_pop() {
new_msg_notification.wait();
auto msg(std::move(queue.front()));
queue.pop();
return msg;
}
public: semaphore::interrupt&
get_interrupt() const { return new_msg_notification.get_interrupt(); }
};
我的 actor
,现在能够在其线程中以非常低的延迟中断其线程.目前的实现是这样的:
My actor
, is now able to interrupt its thread with very low latency in its thread. The implementation presently like this:
class
actor {
private: message_queue
incoming_msgs;
/// must be declared after incoming_msgs due to construction order!
private: semaphore::interrupt&
interrupt;
private: std::thread
my_thread;
private: std::exception_ptr
exception;
public:
actor()
: interrupt(incoming_msgs.get_interrupt()), my_thread(
[&]{
try {
run_actor();
}
catch(...) {
exception = std::current_exception();
}
})
{}
private: virtual void
run_actor() {
while(!interrupt.is_triggered())
process(incoming_msgs.wait_and_pop());
};
private: virtual void
process(const message&) = 0;
public: void
notify(message&& msg_in) {
incoming_msgs.push(std::forward<message>(msg_in));
}
public: virtual
~actor() throw (interrupt_exception) {
interrupt.trigger();
my_thread.join();
if(exception)
std::rethrow_exception(exception);
}
};
推荐答案
你问,
在 C++11 中等待多个条件变量的最佳方式是什么?
你不能,而且必须重新设计.一个线程一次只能等待一个条件变量(及其关联的互斥体).在这方面,Windows 的同步工具比POSIX 风格"系列的同步原语更丰富.
You can't, and must redesign. One thread may wait on only one condition variable (and its associated mutex) at a time. In this regard the Windows facilities for synchronization are rather richer than those of the "POSIX-style" family of synchronization primitives.
使用线程安全队列的典型方法是将特殊的全部完成!"加入队列.消息,或设计一个可破坏"(或可关闭")队列.在后一种情况下,队列的内部条件变量会保护一个复杂的谓词:要么一个项目可用要么队列已损坏.
The typical approach with thread-safe queues is to enqueue a special "all done!" message, or to design a "breakable" (or "shutdown-able") queue. In the latter case, the queue's internal condition variable then protects a complex predicate: either an item is available or the queue has been broken.
在评论中你观察到
如果没有人在等待,则 notify_all() 将无效
这是真的,但可能不相关.wait()
条件变量也意味着检查谓词,并在实际阻塞通知之前检查它.因此,一个忙于处理错过"notify_all()
的队列项目的工作线程将在下一次检查队列条件时看到谓词(新项目可用,或者,队列已完成)已更改.
That's true but probably not relevant. wait()
ing on a condition variable also implies checking a predicate, and checking it before actually blocking for a notification. So, a worker thread busy processing a queue item that "misses" a notify_all()
will see, the next time it inspects the queue condition, that the predicate (a new item is available, or, the queue is all done) has changed.
这篇关于在 C++11 中等待多个条件变量的最佳方法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!