RabbitMQ 是一个功能强大且广泛使用的开源消息代理,可促进分布式系统之间的通信。它实现了高级消息队列协议 (AMQP),使应用程序能够通过队列发送和接收消息,确保可靠和高效的数据传输。RabbitMQ 支持各种消息传递模式,例如发布/订阅、请求/回复和点对点,使其适用于众多用例。
在 .NET 8 的上下文中,集成 RabbitMQ 可以显著增强应用程序的可伸缩性和复原能力。借助最新的 .NET 8 功能,开发人员可以在实现 RabbitMQ 时利用改进的性能和安全性。本文将指导你设置 RabbitMQ,在 .NET 8 环境中配置它,并演示发送和接收消息的实际示例。在本教程结束时,你将对如何利用 RabbitMQ 使用 .NET 8 构建可缩放的分布式系统有深入的了解。
在本文中,我们将了解如何在 .NET Core 8 中实现 RabbitMQ。
若要在 .NET 8 应用程序中实现 RabbitMQ,需要使用特定的 NuGet 包来促进与 RabbitMQ 的通信。应包含的两个主要包是 MassTransit 和 MassTransit.RabbitMQ。MassTransit 是广泛使用的 .NET 服务总线,可简化基于消息的应用程序开发,而 MassTransit.RabbitMQ 则提供与 RabbitMQ 的必要集成。
下面介绍如何将这些包添加到 .NET 8 项目:Here's how you can add these packages to your .NET 8 project:
dotnet add package MassTransit
2. 安装 MassTransit.RabbitMQ:
dotnet add package MassTransit.RabbitMQ
然后在appsetting.json文件中添加以下 RabbitMQ 配置代码:
"RabbitMQ": {
"HostName": "localhost",
"UserName": "guest",
"Password": "guest"
}
现在添加一个类,用于 RabbitMQ 相关设置配置
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace SalesPulse.MessageBroker
{
public class RabbitMQSetting
{
public string? HostName { get; set; }
public string? UserName { get; set; }
public string? Password { get; set; }
}
//RabbitMQ Queue name
public static class RabbitMQQueues
{
public const string OrderValidationQueue = "orderValidationQueue";
public const string AnotherQueue = "anotherQueue";
public const string ThirdQueue = "thirdQueue";
}
}
我们需要一个发布者服务类来发布消息。最好创建一个泛型服务类,以获得更好的代码可重用性。Message publisher 服务类是这样的:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using RabbitMQ.Client;
namespace SalesPulse.MessageBroker.Services
{
public class RabbitMQPublisher<T> : IRabbitMQPublisher<T>
{
private readonly RabbitMQSetting _rabbitMqSetting;
public RabbitMQPublisher(IOptions<RabbitMQSetting> rabbitMqSetting)
{
_rabbitMqSetting = rabbitMqSetting.Value;
}
public async Task PublishMessageAsync(T message, string queueName)
{
var factory = new ConnectionFactory
{
HostName = _rabbitMqSetting.HostName,
UserName = _rabbitMqSetting.UserName,
Password = _rabbitMqSetting.Password
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var messageJson = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(messageJson);
await Task.Run(() => channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body));
}
}
}
并制作接口类
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace SalesPulse.MessageBroker.Services
{
public interface IRabbitMQPublisher<T>
{
Task PublishMessageAsync(T message, string queueName);
}
}
所以,我们的发布者服务类已经准备好了,现在我们需要一个发布者模型类,模型类只不过是发布者消息数据的一个实体。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace SalesPulse.MessageBroker
{
public class OrderValidation
{
public Guid? OrderEntryById { get; set; }
public string? OrderEntryByCode { get; set; }
public List<ProductValidation>? Products { get; set; }
public string? SupervisorDeviceId { get; set; }
public long OrderNumber { get; set; }
public Guid NotificationReceiverId { get; set; }
}
public class ProductValidation
{
public Guid? ProductId { get; set; }
public string? ProductCode { get; set; }
public decimal? OrderAmount { get; set; }
public string? ProductName { get; set; }
}
}
我们为 publisher 端所做的工作几乎完成了,现在我们必须在控制器构造函数中注入我们的 publisher 服务类,并使用 publisher 服务方法来发布消息。在我的情况下,当输入订单时,我必须验证订单,并在此基础上收到推送通知。所以我在订单验证队列中发布我的消息
// Prepare validation data
var orderValidation = await _chemistService.GetOrderValidationInfoByOrderEntry(order.OrderEntryBy, order, orderDetailsList);
// publish order validation data
await _orderValidationMqPublisher.PublishMessageAsync(orderValidation, RabbitMQQueues.OrderValidationQueue);
最后,在program.cs文件中配置这些服务。将 AddScoped 用于 Publisher 服务类的依赖项生命周期。
builder.Services.Configure<RabbitMQSetting>(configuration.GetSection("RabbitMQ"));
builder.Services.AddScoped(typeof(IRabbitMQPublisher<>), typeof(RabbitMQPublisher<>));
我们为发布者端所做的工作已经完成。现在我们必须在消费者端进行配置。让我们看看如何配置它..
appsetting 和其他设置类中的设置过程与 producer 相同。并在消费者端添加相同的发布者类,或者您必须在两个类之间映射才能获取数据。
在这里,我需要一个消费者服务类,它基本上是使用队列中的消息。
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using PulseWorker.Data;
using PulseWorker.Domain;
using PulseWorker.Service;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace PulseWorker.MessageBroker
{
public class OrderValidationMessageConsumerService : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<OrderValidationMessageConsumerService> _logger;
private readonly RabbitMQSetting _rabbitMqSetting;
private IConnection _connection;
private IModel _channel;
public OrderValidationMessageConsumerService(IOptions<RabbitMQSetting> rabbitMqSetting, IServiceProvider serviceProvider, ILogger<OrderValidationMessageConsumerService> logger)
{
_rabbitMqSetting = rabbitMqSetting.Value;
_serviceProvider = serviceProvider;
_logger = logger;
var factory = new ConnectionFactory
{
HostName = _rabbitMqSetting.HostName,
UserName = _rabbitMqSetting.UserName,
Password = _rabbitMqSetting.Password
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
StartConsuming(RabbitMQQueues.OrderValidationQueue, stoppingToken);
await Task.CompletedTask;
}
private void StartConsuming(string queueName, CancellationToken cancellationToken)
{
_channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
bool processedSuccessfully = false;
try
{
processedSuccessfully = await ProcessMessageAsync(message);
}
catch (Exception ex)
{
_logger.LogError($"Exception occurred while processing message from queue {queueName}: {ex}");
}
if (processedSuccessfully)
{
_channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
else
{
_channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
}
};
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}
private async Task<bool> ProcessMessageAsync(string message)
{
try
{
using (var scope = _serviceProvider.CreateScope())
{
var notificationService = scope.ServiceProvider.GetRequiredService<INotificationService>();
var notificationRepository = scope.ServiceProvider.GetRequiredService<INotificationRepository>();
var orderValidationRepository = scope.ServiceProvider.GetRequiredService<IValidationRepository>();
var orderValidation = JsonConvert.DeserializeObject<OrderValidation>(message);
if (string.IsNullOrEmpty(orderValidation?.SupervisorDeviceId) || orderValidation.Products == null)
{
return true;
}
foreach (var item in orderValidation.Products)
{
bool isExceed = await orderValidationRepository.IsValidationAmountExceed(item.ProductId);
if (isExceed)
{
var notificationModel = new NotificationModel
{
DeviceId = orderValidation.SupervisorDeviceId,
IsAndroiodDevice = true,
Title = "OrderValidation",
Body = $"Chemist Order - {orderValidation.OrderNumber} exceeds the validation amount for product {item.ProductName}-{item.ProductCode}"
};
var notificationResponse = await notificationService.SendNotificationAsync(notificationModel);
if (!notificationResponse.IsSuccess)
{
return false;
}
var notification = new Notification
{
Title = notificationModel.Title,
Body = notificationModel.Body,
ReceiverId = orderValidation.NotificationReceiverId,
TypeIdentifyId = orderValidation.OrderNumber.ToString(),
Type = notificationModel.Title,
};
await notificationRepository.AddNotificationAsync(notification);
await notificationRepository.SaveNotificationAsync();
return true;
}
}
return true;
}
}
catch (Exception ex)
{
_logger.LogError($"Error processing message: {ex.Message}");
return false;
}
}
public override void Dispose()
{
_channel.Close();
_connection.Close();
base.Dispose();
}
}
}
在这里,我的消费者服务是消费消息并验证订单,如果验证规则中断,则发送推送通知。
现在,在 program.cs 中配置服务类 A 托管服务
// RabbitMQ Configuration
builder.Services.Configure<RabbitMQSetting>(builder.Configuration.GetSection("RabbitMQ"));
// Register the consumer service as a hosted service only
builder.Services.AddHostedService<OrderValidationMessageConsumerService>();
这就是 .NET 8 中 RabbitMQ 的非常基本和简单的设置。