问题描述
我根据 James Still 的博客文章 使用 RabbitMQ 的真实 PubSub 消息传递.
I've created an ASP.NET Core MVC/WebApi site that has a RabbitMQ subscriber based off James Still's blog article Real-World PubSub Messaging with RabbitMQ.
在他的文章中,他使用一个静态类来启动队列订阅者并为队列中的事件定义事件处理程序.然后,此静态方法通过静态工厂类实例化事件处理程序类.
In his article he uses a static class to start the queue subscriber and define the event handler for queued events. This static method then instantiates the event handler classes via a static factory class.
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace NST.Web.MessageProcessing
{
public static class MessageListener
{
private static IConnection _connection;
private static IModel _channel;
public static void Start(string hostName, string userName, string password, int port)
{
var factory = new ConnectionFactory
{
HostName = hostName,
Port = port,
UserName = userName,
Password = password,
VirtualHost = "/",
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(15)
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: "myExchange", type: "direct", durable: true);
var queueName = "myQueue";
QueueDeclareOk ok = _channel.QueueDeclare(queueName, true, false, false, null);
_channel.QueueBind(queue: queueName, exchange: "myExchange", routingKey: "myRoutingKey");
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += ConsumerOnReceived;
_channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer);
}
public static void Stop()
{
_channel.Close(200, "Goodbye");
_connection.Close();
}
private static void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea)
{
// get the details from the event
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var messageType = "endpoint"; // hardcoding the message type while we dev...
// instantiate the appropriate handler based on the message type
IMessageProcessor processor = MessageHandlerFactory.Create(messageType);
processor.Process(message);
// Ack the event on the queue
IBasicConsumer consumer = (IBasicConsumer)sender;
consumer.Model.BasicAck(ea.DeliveryTag, false);
}
}
}
在我现在需要在消息处理器工厂中解析服务而不是仅仅写入控制台时,它的效果很好.
It works great up to the point where I now need to resolve a service in my message processor factory rather than just write to the console.
using NST.Web.Services;
using System;
namespace NST.Web.MessageProcessing
{
public static class MessageHandlerFactory
{
public static IMessageProcessor Create(string messageType)
{
switch (messageType.ToLower())
{
case "ipset":
// need to resolve IIpSetService here...
IIpSetService ipService = ???????
return new IpSetMessageProcessor(ipService);
case "endpoint":
// need to resolve IEndpointService here...
IEndpointService epService = ???????
// create new message processor
return new EndpointMessageProcessor(epService);
default:
throw new Exception("Unknown message type");
}
}
}
}
有没有办法访问 ASP.NET Core IoC 容器来解决依赖关系?我真的不想手动启动整个依赖堆栈:(
Is there any way to access the ASP.NET Core IoC container to resolve the dependencies? I don't really want to have to spin up the whole stack of dependencies by hand :(
或者,有没有更好的方法从 ASP.NET Core 应用程序订阅 RabbitMQ?我找到了 RestBus 但它没有针对 Core 1.x 进行更新
Or, is there a better way to subscribe to RabbitMQ from an ASP.NET Core application? I found RestBus but it's not been updated for Core 1.x
推荐答案
你可以避免静态类,并通过结合使用依赖注入:
You can avoid the static classes and use Dependency Injection all the way through combined with:
IApplicationLifetime
在应用程序启动/停止时启动/停止监听器.- 使用
IServiceProvider
创建消息处理器的实例.
- The use of
IApplicationLifetime
to start/stop the listener whenever the application starts/stops. - The use of
IServiceProvider
to create instances of the message processors.
首先,让我们将配置移到可以从 appsettings.json 填充的自己的类中:
First thing, let's move the configuration to its own class that can be populated from the appsettings.json:
public class RabbitOptions
{
public string HostName { get; set; }
public string UserName { get; set; }
public string Password { get; set; }
public int Port { get; set; }
}
// In appsettings.json:
{
"Rabbit": {
"hostName": "192.168.99.100",
"username": "guest",
"password": "guest",
"port": 5672
}
}
接下来,将 MessageHandlerFactory
转换为接收 IServiceProvider
作为依赖项的非静态类.它将使用服务提供者来解析消息处理器实例:
Next, convert MessageHandlerFactory
into a non-static class that receives an IServiceProvider
as a dependency. It will use the service provider to resolve the message processor instances:
public class MessageHandlerFactory
{
private readonly IServiceProvider services;
public MessageHandlerFactory(IServiceProvider services)
{
this.services = services;
}
public IMessageProcessor Create(string messageType)
{
switch (messageType.ToLower())
{
case "ipset":
return services.GetService<IpSetMessageProcessor>();
case "endpoint":
return services.GetService<EndpointMessageProcessor>();
default:
throw new Exception("Unknown message type");
}
}
}
这样,您的消息处理器类可以在构造函数中接收它们需要的任何依赖项(只要您在 Startup.ConfigureServices
中配置它们).例如,我将 ILogger 注入到我的一个示例处理器中:
This way your message processor classes can receive in the constructor any dependencies they need (as long as you configure them in Startup.ConfigureServices
). For example, I am injecting an ILogger into one of my sample processors:
public class IpSetMessageProcessor : IMessageProcessor
{
private ILogger<IpSetMessageProcessor> logger;
public IpSetMessageProcessor(ILogger<IpSetMessageProcessor> logger)
{
this.logger = logger;
}
public void Process(string message)
{
logger.LogInformation("Received message: {0}", message);
}
}
现在将 MessageListener
转换为依赖于 IOptions
和 MessageHandlerFactory
的非静态类.它与您原来的非常相似,我只是用选项依赖替换了 Start 方法的参数,处理程序工厂现在是依赖而不是静态类:
Now convert MessageListener
into a non-static class that depends on IOptions<RabbitOptions>
and MessageHandlerFactory
.It's very similar to your original one, I just replaced the parameters of the Start methods with the options dependency and the handler factory is now a dependency instead of a static class:
public class MessageListener
{
private readonly RabbitOptions opts;
private readonly MessageHandlerFactory handlerFactory;
private IConnection _connection;
private IModel _channel;
public MessageListener(IOptions<RabbitOptions> opts, MessageHandlerFactory handlerFactory)
{
this.opts = opts.Value;
this.handlerFactory = handlerFactory;
}
public void Start()
{
var factory = new ConnectionFactory
{
HostName = opts.HostName,
Port = opts.Port,
UserName = opts.UserName,
Password = opts.Password,
VirtualHost = "/",
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(15)
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: "myExchange", type: "direct", durable: true);
var queueName = "myQueue";
QueueDeclareOk ok = _channel.QueueDeclare(queueName, true, false, false, null);
_channel.QueueBind(queue: queueName, exchange: "myExchange", routingKey: "myRoutingKey");
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += ConsumerOnReceived;
_channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer);
}
public void Stop()
{
_channel.Close(200, "Goodbye");
_connection.Close();
}
private void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea)
{
// get the details from the event
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var messageType = "endpoint"; // hardcoding the message type while we dev...
//var messageType = Encoding.UTF8.GetString(ea.BasicProperties.Headers["message-type"] as byte[]);
// instantiate the appropriate handler based on the message type
IMessageProcessor processor = handlerFactory.Create(messageType);
processor.Process(message);
// Ack the event on the queue
IBasicConsumer consumer = (IBasicConsumer)sender;
consumer.Model.BasicAck(ea.DeliveryTag, false);
}
}
差不多了,您需要更新 Startup.ConfigureServices
方法,以便它了解您的服务和选项(如果需要,您可以为侦听器和处理程序工厂创建接口):
Almost there, you will need to update the Startup.ConfigureServices
method so it knows about your services and options (You can create interfaces for the listener and handler factory if you want):
public void ConfigureServices(IServiceCollection services)
{
// ...
// Add RabbitMQ services
services.Configure<RabbitOptions>(Configuration.GetSection("rabbit"));
services.AddTransient<MessageListener>();
services.AddTransient<MessageHandlerFactory>();
services.AddTransient<IpSetMessageProcessor>();
services.AddTransient<EndpointMessageProcessor>();
}
最后,更新 Startup.Configure
方法以采用额外的 IApplicationLifetime
参数并在 ApplicationStarted
/<中启动/停止消息侦听器code>ApplicationStopped 事件(虽然我前段时间注意到使用 IISExpress 的 ApplicationStopping 事件存在一些问题,如 这个问题):
Finally, update the Startup.Configure
method to take an extra IApplicationLifetime
parameter and start/stop the message listener in the ApplicationStarted
/ApplicationStopped
events (Although I noticed a while ago some issues with the ApplicationStopping event using IISExpress, as in this question):
public MessageListener MessageListener { get; private set; }
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory, IApplicationLifetime appLifetime)
{
appLifetime.ApplicationStarted.Register(() =>
{
MessageListener = app.ApplicationServices.GetService<MessageListener>();
MessageListener.Start();
});
appLifetime.ApplicationStopping.Register(() =>
{
MessageListener.Stop();
});
// ...
}
这篇关于从静态工厂类访问 ASP.NET Core DI 容器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!