问题描述
RabbitMQ 有没有一种类似于 MSSMQ 的使用方法,可以从队列中弹出 1000 条消息,然后插入数据库并从那里继续.
With RabbitMQ is there a way to use it similar to MSSMQ where one can pop 1000 messages from the queue, then do your inserts to the database and continue from there.
我似乎无法通过订阅频道然后在订阅中的 BasicDeliveryEventArgs 上执行 foreach 来执行此操作,并使用我想要在给定时间处理的最大消息数执行 If 语句.
I cannot seem to do that with a Subscription to a channel and then doing a foreach over the BasicDeliveryEventArgs in the Subscription, with that doing a If statement with the max message count I want to process at the given time.
提前致谢然而,这仍然会从队列中获取所有 22k 消息
Thanks in advance This however still takes all 22k messages from the queue
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare("****", true, false, false, null);
var subscription = new Subscription(channel, "****", false);
int maxMessages = 5;
int i = 0;
foreach (BasicDeliverEventArgs eventArgs in subscription)
{
if (++i == maxMessages)
{
Console.WriteLine("Took 5 messages");
subscription.Ack(eventArgs);
break;
}
}
}
}
推荐答案
我假设你想优化将消息加载到数据库中,通过将它们分组到更大的事务中而不是消耗每个事务的成本信息.强制性警告表明,这样做意味着大量消息可能会一起失败,即使其中只有一个会导致问题,这就是您的处理方式...
I'm assuming that you want to optimise loading of messages into the database by batching up groups of them into larger transactions rather than wearing the cost of a transaction per message. With the obligatory warning that doing so means large groups of messages can fail together, even if only one of them causes a problem, here's how you'd go about it...
在频道上设置QOS:
channel.BasicQos(0, 1000, false);
这将预先获取 1000 条消息并阻止进一步的流量,直到您确认某些内容.请注意,它不会以 1000 个块为单位进行获取.相反,它确保在任何时候最多预获取 1000 条 UNACK 消息.模拟块传输就像首先处理 1000 条消息,然后一次性 ACK 一样简单.
This will pre-fetch 1000 messages and block further traffic until you ACK something. Note that it doesn't fetch in blocks of 1000. Rather, it ensures that a maximum of 1000 UNACK'ed messages are pre-fetched at any one time. Simulating block transfers is as simple as processing the 1000 messages first, and then ACK'ing them all in one go.
请参阅此处和这里比我的解释更权威.
See here and here for a more authoritative explanation than mine.
还有一点:您可能希望在消息可用时立即刷新队列,即使您尚未达到 1000 条消息的配额.您应该能够通过在 foreach
循环内调用 queue.BasicGet()
直到它运行干,然后交付您拥有的任何内容(包括您拉出的消息)来执行此操作subscription
) 到数据库.警告:我自己没有尝试过,所以我可能在说垃圾,但我认为它会起作用.这种方法的美妙之处在于它立即将消息推送到数据库中,而无需等待整批 1000 条消息.如果数据库因处理太多小事务而落后,预取积压将在每个周期之间简单地填满.
One more point: You may want to flush the queue as soon as messages are available, even if you haven't made your quota of 1000 messages. You should be able to do this by calling queue.BasicGet()
inside the foreach
loop until it runs dry, and then delivering whatever you have (including the message you pulled out of subscription
) to the database. Caveat: I haven't tried this myself, so I could be talking rubbish, but I think it'll work. The beauty of this method is that it pushes messages into the database immediately, without having to wait for a full batch of 1000 messages. If the database falls behind from handling too many small transactions, the prefetch backlog will simply fill up more between each cycle.
这篇关于RabbitMQ 和 C#的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!