在 .NET8 中通过 RabbitMQ实现消息队列功能

作者:微信公众号:【架构师老卢】
6-5 17:10
83

概述:RabbitMQ 是一个功能强大且广泛使用的开源消息代理,可促进分布式系统之间的通信。它实现了高级消息队列协议 (AMQP),使应用程序能够通过队列发送和接收消息,确保可靠和高效的数据传输。RabbitMQ 支持各种消息传递模式,例如发布/订阅、请求/回复和点对点,使其适用于众多用例。在 .NET 8 的上下文中,集成 RabbitMQ 可以显著增强应用程序的可伸缩性和复原能力。借助最新的 .NET 8 功能,开发人员可以在实现 RabbitMQ 时利用改进的性能和安全性。本文将指导你设置 RabbitMQ,在 .NET 8 环境中配置它,并演示发送和接收消息的实际示例。在本教程结束时,你将对

RabbitMQ 是一个功能强大且广泛使用的开源消息代理,可促进分布式系统之间的通信。它实现了高级消息队列协议 (AMQP),使应用程序能够通过队列发送和接收消息,确保可靠和高效的数据传输。RabbitMQ 支持各种消息传递模式,例如发布/订阅、请求/回复和点对点,使其适用于众多用例。

在 .NET 8 的上下文中,集成 RabbitMQ 可以显著增强应用程序的可伸缩性和复原能力。借助最新的 .NET 8 功能,开发人员可以在实现 RabbitMQ 时利用改进的性能和安全性。本文将指导你设置 RabbitMQ,在 .NET 8 环境中配置它,并演示发送和接收消息的实际示例。在本教程结束时,你将对如何利用 RabbitMQ 使用 .NET 8 构建可缩放的分布式系统有深入的了解。

在本文中,我们将了解如何在 .NET Core 8 中实现 RabbitMQ。

若要在 .NET 8 应用程序中实现 RabbitMQ,需要使用特定的 NuGet 包来促进与 RabbitMQ 的通信。应包含的两个主要包是 MassTransit 和 MassTransit.RabbitMQ。MassTransit 是广泛使用的 .NET 服务总线,可简化基于消息的应用程序开发,而 MassTransit.RabbitMQ 则提供与 RabbitMQ 的必要集成。

下面介绍如何将这些包添加到 .NET 8 项目:Here's how you can add these packages to your .NET 8 project:

  1. 安装 MassTransit:
dotnet add package MassTransit

2. 安装 MassTransit.RabbitMQ:

dotnet add package MassTransit.RabbitMQ

然后在appsetting.json文件中添加以下 RabbitMQ 配置代码:

"RabbitMQ": {  
  "HostName": "localhost",  
  "UserName": "guest",  
  "Password": "guest"  
    
}

现在添加一个类,用于 RabbitMQ 相关设置配置

using System;  
using System.Collections.Generic;  
using System.Linq;  
using System.Text;  
using System.Threading.Tasks;  
  
namespace SalesPulse.MessageBroker  
{  
    public class RabbitMQSetting  
    {  
        public string? HostName { get; set; }  
        public string? UserName { get; set; }  
        public string? Password { get; set; }  
    }  
  
    //RabbitMQ Queue name  
    public static class RabbitMQQueues  
    {  
        public const string OrderValidationQueue = "orderValidationQueue";  
        public const string AnotherQueue = "anotherQueue";  
        public const string ThirdQueue = "thirdQueue";  
    }  
}

我们需要一个发布者服务类来发布消息。最好创建一个泛型服务类,以获得更好的代码可重用性。Message publisher 服务类是这样的:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using RabbitMQ.Client;

namespace SalesPulse.MessageBroker.Services
{
    public class RabbitMQPublisher<T> : IRabbitMQPublisher<T>
    {
        private readonly RabbitMQSetting _rabbitMqSetting;

        public RabbitMQPublisher(IOptions<RabbitMQSetting> rabbitMqSetting)
        {
            _rabbitMqSetting = rabbitMqSetting.Value;
        }

        public async Task PublishMessageAsync(T message, string queueName)
        {

            var factory = new ConnectionFactory
            {
                HostName = _rabbitMqSetting.HostName,
                UserName = _rabbitMqSetting.UserName,
                Password = _rabbitMqSetting.Password
            };

            using var connection = factory.CreateConnection();
            using var channel = connection.CreateModel();
            channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);

            var messageJson = JsonConvert.SerializeObject(message);
            var body = Encoding.UTF8.GetBytes(messageJson);

            await Task.Run(() => channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body));
        }
    }
}

并制作接口类

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace SalesPulse.MessageBroker.Services
{
    public interface IRabbitMQPublisher<T>
    {
        Task PublishMessageAsync(T message, string queueName);
    }
}

所以,我们的发布者服务类已经准备好了,现在我们需要一个发布者模型类,模型类只不过是发布者消息数据的一个实体。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace SalesPulse.MessageBroker
{
    public class OrderValidation
    {
        public Guid? OrderEntryById { get; set; }
        public string? OrderEntryByCode { get; set; }
        public List<ProductValidation>? Products { get; set; }
        public string? SupervisorDeviceId { get; set; }
        public long OrderNumber { get; set; }
        public Guid NotificationReceiverId { get; set; }
    }

    public class ProductValidation
    {
        public Guid? ProductId { get; set; }
        public string? ProductCode { get; set; }
        public decimal? OrderAmount { get; set; }
        public string? ProductName { get; set; }
    }

}

我们为 publisher 端所做的工作几乎完成了,现在我们必须在控制器构造函数中注入我们的 publisher 服务类,并使用 publisher 服务方法来发布消息。在我的情况下,当输入订单时,我必须验证订单,并在此基础上收到推送通知。所以我在订单验证队列中发布我的消息


// Prepare validation data

var orderValidation = await _chemistService.GetOrderValidationInfoByOrderEntry(order.OrderEntryBy, order, orderDetailsList);

// publish order validation data
await _orderValidationMqPublisher.PublishMessageAsync(orderValidation, RabbitMQQueues.OrderValidationQueue);

最后,在program.cs文件中配置这些服务。将 AddScoped 用于 Publisher 服务类的依赖项生命周期。

builder.Services.Configure<RabbitMQSetting>(configuration.GetSection("RabbitMQ"));
builder.Services.AddScoped(typeof(IRabbitMQPublisher<>), typeof(RabbitMQPublisher<>));

我们为发布者端所做的工作已经完成。现在我们必须在消费者端进行配置。让我们看看如何配置它..

appsetting 和其他设置类中的设置过程与 producer 相同。并在消费者端添加相同的发布者类,或者您必须在两个类之间映射才能获取数据。

在这里,我需要一个消费者服务类,它基本上是使用队列中的消息。

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using PulseWorker.Data;
using PulseWorker.Domain;
using PulseWorker.Service;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace PulseWorker.MessageBroker
{
    public class OrderValidationMessageConsumerService : BackgroundService
    {
        private readonly IServiceProvider _serviceProvider;
        private readonly ILogger<OrderValidationMessageConsumerService> _logger;
        private readonly RabbitMQSetting _rabbitMqSetting;
        private IConnection _connection;
        private IModel _channel;

        public OrderValidationMessageConsumerService(IOptions<RabbitMQSetting> rabbitMqSetting, IServiceProvider serviceProvider, ILogger<OrderValidationMessageConsumerService> logger)
        {
            _rabbitMqSetting = rabbitMqSetting.Value;
            _serviceProvider = serviceProvider;
            _logger = logger;

            var factory = new ConnectionFactory
            {
                HostName = _rabbitMqSetting.HostName,
                UserName = _rabbitMqSetting.UserName,
                Password = _rabbitMqSetting.Password
            };
            _connection = factory.CreateConnection();
            _channel = _connection.CreateModel();
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            StartConsuming(RabbitMQQueues.OrderValidationQueue, stoppingToken);
            await Task.CompletedTask;
        }

        private void StartConsuming(string queueName, CancellationToken cancellationToken)
        {
            _channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);

            var consumer = new EventingBasicConsumer(_channel);
            consumer.Received += async (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);

                bool processedSuccessfully = false;
                try
                {
                    processedSuccessfully = await ProcessMessageAsync(message);
                }
                catch (Exception ex)
                {
                    _logger.LogError($"Exception occurred while processing message from queue {queueName}: {ex}");
                }

                if (processedSuccessfully)
                {
                    _channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                }
                else
                {
                    _channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
                }
            };

            _channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
        }

        private async Task<bool> ProcessMessageAsync(string message)
        {
            try
            {
                using (var scope = _serviceProvider.CreateScope())
                {
                    var notificationService = scope.ServiceProvider.GetRequiredService<INotificationService>();
                    var notificationRepository = scope.ServiceProvider.GetRequiredService<INotificationRepository>();
                    var orderValidationRepository = scope.ServiceProvider.GetRequiredService<IValidationRepository>();
                    var orderValidation = JsonConvert.DeserializeObject<OrderValidation>(message);

                    if (string.IsNullOrEmpty(orderValidation?.SupervisorDeviceId) || orderValidation.Products == null)
                    {
                        return true;
                    }

                    foreach (var item in orderValidation.Products)
                    {
                        bool isExceed = await orderValidationRepository.IsValidationAmountExceed(item.ProductId);

                        if (isExceed)
                        {
                            var notificationModel = new NotificationModel
                            {
                                DeviceId = orderValidation.SupervisorDeviceId,
                                IsAndroiodDevice = true,
                                Title = "OrderValidation",
                                Body = $"Chemist Order - {orderValidation.OrderNumber} exceeds the validation amount for product {item.ProductName}-{item.ProductCode}"
                            };

                            var notificationResponse = await notificationService.SendNotificationAsync(notificationModel);

                            if (!notificationResponse.IsSuccess)
                            {
                                return false;
                            }

                            var notification = new Notification
                            {
                                Title = notificationModel.Title,
                                Body = notificationModel.Body,
                                ReceiverId = orderValidation.NotificationReceiverId,
                                TypeIdentifyId = orderValidation.OrderNumber.ToString(),
                                Type = notificationModel.Title,
                            };

                            await notificationRepository.AddNotificationAsync(notification);
                            await notificationRepository.SaveNotificationAsync();

                            return true;
                        }
                    }

                    return true;
                }
            }
            catch (Exception ex)
            {
                _logger.LogError($"Error processing message: {ex.Message}");
                return false;
            }
        }

        public override void Dispose()
        {
            _channel.Close();
            _connection.Close();
            base.Dispose();
        }
    }
}

在这里,我的消费者服务是消费消息并验证订单,如果验证规则中断,则发送推送通知。

现在,在 program.cs 中配置服务类 A 托管服务

// RabbitMQ Configuration
builder.Services.Configure<RabbitMQSetting>(builder.Configuration.GetSection("RabbitMQ"));
// Register the consumer service as a hosted service only
builder.Services.AddHostedService<OrderValidationMessageConsumerService>();

这就是 .NET 8 中 RabbitMQ 的非常基本和简单的设置。

阅读排行