如何在核心微服务架构中使用 RabbitMQ 实现 CQRS 模式 ASP.NET

作者:微信公众号:【架构师老卢】
2-14 19:8
142

微服务架构代表了软件设计的范式转变,将大型单体应用程序分解为更小的、可管理的服务,这些服务独立运行并通过定义明确的 API 进行通信。

微服务架构的简要概述

在 C# 中,微服务可以是更大系统的一部分:

using System;
using Microsoft.AspNetCore.Mvc;
[Route("api/[controller]")]
[ApiController]
public class UserController : ControllerBase
{
    [HttpGet]
    public ActionResult<string> GetUser()
    {
        // Logic to fetch user data from a database or external service
        return "User data";
    }
}

在此代码片段中,UserController 公开了一个 HTTP GET 终结点来检索用户数据,展示了此微服务的单一责任。

命令查询责任分离 (CQRS) 模式的说明

CQRS 从根本上将处理命令(更改系统状态)与查询(在不修改状态的情况下检索数据)的职责分开。这种隔离允许针对每种类型的操作进行优化。

// Example of Command and Query models in C#  
public class Command  
{  
    public string Id { get; set; }  
    public object Payload { get; set; }  
}  
  
public class Query  
{  
    public string Id { get; set; }  
}  
// Command Handler  
public class CommandHandler  
{  
    public void HandleCommand(Command command)  
    {  
        // Logic to process and update the system state based on the command  
    }  
}  
// Query Handler  
public class QueryHandler  
{  
    public object HandleQuery(Query query)  
    {  
        // Logic to retrieve and return data without altering the system state  
        return null;  
    }  
}

这种将命令和查询模型与专用处理程序分开的做法简化了代码库,并实现了对写入和读取操作的定制优化。

消息代理(如 RabbitMQ)在微服务中的重要性

以 RabbitMQ 为代表的消息代理在微服务架构中至关重要,它为服务之间的异步通信提供了强大的机制。它们支持不同组件之间的通信解耦、可靠性和可扩展性。

// Example of using RabbitMQ with RabbitMQ.Client in C#  
using RabbitMQ.Client;  
class RabbitMQService  
{  
    public void SendMessageToQueue(string queueName, string message)  
    {  
        var factory = new ConnectionFactory() { HostName = "localhost" };  
        using var connection = factory.CreateConnection();  
        using var channel = connection.CreateModel();  
        channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);  
        var body = Encoding.UTF8.GetBytes(message);  
        channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);  
        Console.WriteLine($"Message sent to {queueName}: {message}");  
    }  
}

在此代码片段中,RabbitMQ 用于将消息发送到特定队列,从而确保微服务之间的可靠通信。

CQRS原理说明

CQRS(命令查询责任隔离)是一种体系结构模式,它提倡在应用程序中的读取和写入操作之间明确分离。它区分用于读取数据(查询)的模型和逻辑以及用于修改数据(命令)的模型和逻辑。

// Example of Command and Query models in C#  
public class Command  
{  
    public string Id { get; set; }  
    public object Payload { get; set; }  
}  
public class Query  
{  
    public string Id { get; set; }  
}

在上面的示例中,Command 和 Query 类分别表示用于处理写入和读取操作的不同模型。这种分离有助于隔离关注点并简化每种操作类型的逻辑。

分离读写操作的优点

分离读取和写入操作具有以下几个优点:

  • **优化:**不同的模型可以针对其特定任务进行优化。例如,可以针对快速读取定制查询模型。
  • **可扩展性:**系统可以独立扩展以进行读取和写入,从而优化性能。
  • **灵活性:**修改写入逻辑不会影响读取操作,反之亦然,从而在设计和演进方面提供更大的灵活性。
// Command Handler  
public class CommandHandler  
{  
    public void HandleCommand(Command command)  
    {  
        // Logic to process and update the system state based on the command  
    }  
}  
  
// Query Handler  
public class QueryHandler  
{  
    public object HandleQuery(Query query)  
    {  
        // Logic to retrieve and return data without altering the system state  
        return null;  
    }  
}

将命令和查询的处理分开,可以根据每个操作的特定要求定制专用逻辑。

CQRS 大放异彩的用例和场景

CQRS 在以下情况下特别有用:

  • **复杂域:**处理复杂业务逻辑的系统受益于 CQRS 提供的关注点分离。
  • **性能优化:**需要针对高吞吐量读取和写入操作进行优化的应用程序。
  • **事件溯源:**CQRS 通过提供单独的模型来处理基于事件的命令和查询,从而补充了事件溯源。
// Example of using CQRS with Event Sourcing  
public class EventSourcingHandler  
{  
    public void ApplyEvent(Event event)  
    {  
        // Logic to apply an event and update the system state  
    }  
    public object GetState()  
    {  
        // Logic to reconstruct the system state based on events for query purposes  
        return null;  
    }  
}

通过将 CQRS 与事件溯源结合使用,应用程序可以在处理命令/事件和查询数据之间保持明确的分离。

RabbitMQ 作为消息代理的概述

RabbitMQ 是一个强大的开源消息代理,可促进分布式应用程序之间的通信。它充当中介,使应用程序的各个组件能够无缝地通信和传输数据。

// Example of using RabbitMQ with RabbitMQ.Client in C#  
using RabbitMQ.Client;  
class RabbitMQService  
{  
    public void SendMessageToQueue(string queueName, string message)  
    {  
        var factory = new ConnectionFactory() { HostName = "localhost" };  
        using var connection = factory.CreateConnection();  
        using var channel = connection.CreateModel();  
        channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);  
        var body = Encoding.UTF8.GetBytes(message);  
        channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);  
        Console.WriteLine($"Message sent to {queueName}: {message}");  
    }  
}

在上面的代码中,描述了一个类,展示了如何使用 RabbitMQ 的客户端库将消息发送到特定队列。RabbitMQService

微服务架构的主要特性和优势:

RabbitMQ 提供了多项功能,使其非常适合微服务架构:

  • **可靠性:**它确保消息传递并支持消息确认机制。
  • **灵活性:**支持各种消息传递模式(发布/订阅、点对点)和协议(AMQP、MQTT)。
  • **可扩展性:**允许通过在多个节点或集群之间分发消息进行水平扩展。
// Example of using RabbitMQ for Publish-Subscribe  
public class Publisher  
{  
    public void Publish(string exchangeName, string message)  
    {  
        var factory = new ConnectionFactory() { HostName = "localhost" };  
        using var connection = factory.CreateConnection();  
        using var channel = connection.CreateModel();  
        channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);  
        var body = Encoding.UTF8.GetBytes(message);  
        channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);  
        Console.WriteLine($"Message published to {exchangeName}: {message}");  
    }  
}

代码片段通过向交易所发布消息来演示 RabbitMQ 的发布-订阅功能。

RabbitMQ 如何促进异步通信?

RabbitMQ 通过解耦发送方和接收方组件来促进异步通信,允许它们独立运行。它通过消息队列实现这一点,确保消息在应用程序的不同部分之间可靠地传递。

// Example of consuming messages from a RabbitMQ queue  
class Consumer  
{  
    public void ConsumeFromQueue(string queueName)  
    {  
        var factory = new ConnectionFactory() { HostName = "localhost" };  
        using var connection = factory.CreateConnection();  
        using var channel = connection.CreateModel();  
        channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);  
        var consumer = new EventingBasicConsumer(channel);  
        consumer.Received += (model, ea) =>  
        {  
            var body = ea.Body.ToArray();  
            var message = Encoding.UTF8.GetString(body);  
            Console.WriteLine($"Message received from {queueName}: {message}");  
        };  
        channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);  
    }  
}

该类演示了如何使用来自特定队列的消息,从而允许组件异步处理收到的消息。Consumer

将 RabbitMQ 与 CQRS 集成

将 CQRS 与 RabbitMQ 结合使用时的设计注意事项

将 CQRS 与 RabbitMQ 集成时,应考虑以下几个设计注意事项:

  • **消息结构:**以清晰、一致的格式设计命令和事件的消息。
  • **错误处理:**实施用于处理消息处理中的错误和重试的策略。
  • 消息持久性: 配置队列以保证消息持久性,以避免数据丢失。
  • **可扩展性:**通过考虑 RabbitMQ 集群和负载均衡来规划可伸缩性。

使用 RabbitMQ 作为消息传递主干的命令处理

使用 RabbitMQ 作为命令处理的消息传递主干涉及将命令发送到队列,这些命令稍后由处理程序使用以进行处理。

现在,让我们考虑一个在线订购系统的场景,在该场景中,我们将 RabbitMQ 与 C# 中的 CQRS 集成在一起,以异步处理订单:

场景:

在在线订购系统中,当下新订单时,需要异步处理。我们将使用 RabbitMQ 来处理命令(下订单)和事件(订单处理)。系统将按照 CQRS 原则,使用队列分离命令和事件。

设计注意事项:

  • **订单命令:**表示下订单的命令。
  • **订单事件:**表示已处理订单的事件。
  • **错误处理:**对失败的订单实施重试机制。

命令处理:

public class OrderCommandHandler  
{  
    private readonly string commandQueueName = "order\_commands";  
  
    public void SendOrderCommand(OrderCommand command)  
    {  
        var factory = new ConnectionFactory() { HostName = "localhost" };  
        using var connection = factory.CreateConnection();  
        using var channel = connection.CreateModel();  
        channel.QueueDeclare(queue: commandQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);  
        var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(command));  
        channel.BasicPublish(exchange: "", routingKey: commandQueueName, basicProperties: null, body: body);  
        Console.WriteLine($"Order command sent: {JsonConvert.SerializeObject(command)}");  
    }  
    public void ConsumeOrderCommands()  
    {  
        var factory = new ConnectionFactory() { HostName = "localhost" };  
        using var connection = factory.CreateConnection();  
        using var channel = connection.CreateModel();  
        channel.QueueDeclare(queue: commandQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);  
        var consumer = new EventingBasicConsumer(channel);  
        consumer.Received += (model, ea) =>  
        {  
            var body = ea.Body.ToArray();  
            var commandMessage = Encoding.UTF8.GetString(body);  
            var orderCommand = JsonConvert.DeserializeObject\<OrderCommand>(commandMessage);  
  
            // Process the order command  
            Task.Run(() => ProcessOrderCommand(orderCommand));  
  
            // Acknowledge the message  
            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);  
        };  
        channel.BasicConsume(queue: commandQueueName, autoAck: false, consumer: consumer);  
    }  
    private void ProcessOrderCommand(OrderCommand orderCommand)  
    {  
        // Logic to process the order command asynchronously  
        Console.WriteLine($"Processing order command: {JsonConvert.SerializeObject(orderCommand)}");  
          
        // Place order, perform validation, etc.  
        // If successful, publish an order processed event  
        var orderEvent = new OrderEvent { OrderId = orderCommand.OrderId, Status = "Processed" };  
        SendOrderProcessedEvent(orderEvent);  
    }  
    private void SendOrderProcessedEvent(OrderEvent orderEvent)  
    {  
        var eventQueueName = "order\_events";  
        var factory = new ConnectionFactory() { HostName = "localhost" };  
        using var connection = factory.CreateConnection();  
        using var channel = connection.CreateModel();  
        channel.QueueDeclare(queue: eventQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);  
        var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(orderEvent));  
        channel.BasicPublish(exchange: "", routingKey: eventQueueName, basicProperties: null, body: body);  
        Console.WriteLine($"Order processed event sent: {JsonConvert.SerializeObject(orderEvent)}");  
    }  
}

为命令和事件实现消息队列

在具有 RabbitMQ 集成的基于 CQRS 的系统中,为命令和事件建立了单独的队列,以实现组件之间的异步通信。

public class OrderEventConsumer  
{  
    private readonly string eventQueueName = "order\_events";  
  
    public void ConsumeOrderEvents()  
    {  
        var factory = new ConnectionFactory() { HostName = "localhost" };  
        using var connection = factory.CreateConnection();  
        using var channel = connection.CreateModel();  
        channel.QueueDeclare(queue: eventQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);  
        var consumer = new EventingBasicConsumer(channel);  
        consumer.Received += (model, ea) =>  
        {  
            var body = ea.Body.ToArray();  
            var eventMessage = Encoding.UTF8.GetString(body);  
            var orderEvent = JsonConvert.DeserializeObject\<OrderEvent>(eventMessage);  
            Console.WriteLine($"Received order processed event: {JsonConvert.SerializeObject(orderEvent)}");  
            // Logic to handle the processed order event  
        };  
        channel.BasicConsume(queue: eventQueueName, autoAck: true, consumer: consumer);  
    }  
}

异步通信和事件驱动架构

RabbitMQ 允许组件以非阻塞方式对事件和消息做出反应,从而促进事件驱动架构中的异步通信。

public class Program  
{  
    public static void Main(string[] args)  
    {  
        var orderCommandHandler = new OrderCommandHandler();  
        var orderEventConsumer = new OrderEventConsumer();  
  
        // Example: Sending an order command  
        var orderCommand = new OrderCommand { OrderId = Guid.NewGuid(), Product = "Product A", Quantity = 2 };  
        orderCommandHandler.SendOrderCommand(orderCommand);  
  
        // Consume order commands and events asynchronously  
        Task.Run(() => orderCommandHandler.ConsumeOrderCommands());  
        Task.Run(() => orderEventConsumer.ConsumeOrderEvents());  
        Console.ReadLine(); // Keep the application running  
    }  
}

在微服务中使用 RabbitMQ 实现 CQRS

设置微服务基础结构

为简单起见,让我们创建两个微服务:一个用于处理命令 (OrderCommandService),另一个用于处理查询 (OrderQueryService)。每个服务将处理 CQRS 模式的特定方面。

OrderCommandService

// OrderCommandService: Handles commands (placing orders)  
public class OrderCommandService  
{  
    private readonly string commandQueueName = "order\_commands";  
    public void SendOrderCommand(OrderCommand command)  
    {  
        // Code to send order command to RabbitMQ queue (similar to previously shown CommandSender)  
    }  
    public void ConsumeOrderCommands()  
    {  
        // Code to consume order commands from RabbitMQ queue (similar to previously shown CommandConsumer)  
        // Process received commands asynchronously and trigger events accordingly  
    }  
}

OrderQueryService

// OrderQueryService: Handles queries (fetching orders)  
public class OrderQueryService  
{  
    private readonly string queryQueueName = "order_queries";  
    public void SendOrderQuery(Query query)  
    {  
        // Code to send order query to RabbitMQ queue (similar to previously shown CommandSender)  
    }  
    public void ConsumeOrderQueries()  
    {  
        // Code to consume order queries from RabbitMQ queue (similar to previously shown CommandConsumer)  
        // Process received queries and retrieve orders data asynchronously  
    }  
}

在微服务中定义命令和查询模型

命令和查询模型

// Command model  
public class OrderCommand  
{  
    public string OrderId { get; set; }  
    // Other order-related properties...  
}  
// Query model  
public class OrderQuery  
{  
    public string QueryId { get; set; }  
    // Other query-related properties...  
}

使用 RabbitMQ 编写命令、处理事件和查询

OrderCommandService

// Sending order commands  
OrderCommandService orderCommandService = new OrderCommandService();  
OrderCommand orderCommand = new OrderCommand { OrderId = "123", /* Other order properties */ };  
orderCommandService.SendOrderCommand(orderCommand);  
// Consuming order commands  
orderCommandService.ConsumeOrderCommands();

OrderQueryService

// Sending order queries  
OrderQueryService orderQueryService = new OrderQueryService();  
OrderQuery orderQuery = new OrderQuery { QueryId = "456", /* Other query properties */ };  
orderQueryService.SendOrderQuery(orderQuery);  
// Consuming order queries  
orderQueryService.ConsumeOrderQueries();

确保数据一致性和最终一致性

实现数据一致性和最终一致性将涉及其他步骤,例如使用事件溯源、使用一致的读取模型进行查询,以及确保正确处理微服务中的事件。这涉及超出简单代码片段范围的更详细的实现,重点关注如何处理和传播事件以保持微服务之间的一致性。

此示例演示了使用 CQRS 和 RabbitMQ 的在线订购系统的基本微服务设置,概述了用于处理命令和查询的服务之间的结构和交互。在实际场景中,实现完全数据一致性和最终一致性通常需要对事件、数据存储机制和错误恢复策略进行更复杂的处理。

用于其他学习的资源、文章和书籍列表

  1. 书:
  • Gregor Hohpe 和 Bobby Woolf 的“企业集成模式”
  • “RabbitMQ in Action”,作者:Alvaro Videla 和 Jason J.W. Williams
  • Martin Kleppmann 的“设计数据密集型应用程序”

2. 文章:

  • Microsoft 模式和实践团队的“CQRS 之旅”
  • “探索 CQRS 和事件溯源”,作者:Greg Young
  • “微服务架构:CQRS 和事件溯源”,作者:Chris Richardson
  • RabbitMQ 官网“RabbitMQ 教程”

相关文档和案例研究的链接

  1. 官方文档:

2. 案例研究和白皮书:

引用来源和研究的参考文献

  1. 霍普、格雷戈尔和伍尔夫、鲍比。“企业集成模式”。
  2. Videla、Alvaro 和 Williams、Jason J.W. “RabbitMQ 在行动”。
  3. 克莱普曼,马丁。“设计数据密集型应用程序。”
  4. Microsoft 模式和实践团队。“CQRS 之旅。”
  5. 年轻,格雷格。“探索 CQRS 和事件溯源。”
  6. 理查森,克里斯。“微服务架构:CQRS 和事件溯源。”
  7. RabbitMQ 官方文档。
  8. Microsoft Azure 文档 — CQRS 模式。
  9. Netflix 技术博客 — 可扩展微服务案例研究。
  10. Uber 工程博客 — 关于扩展微服务的案例研究。

在微服务架构中使用 RabbitMQ 实现 CQRS 提供了一种强大的方法来构建可扩展的解耦系统,这些系统可以有效地处理复杂的操作。探索更多资源、文档、案例研究和文献非常重要,以便更深入地了解此体系结构方法中涉及的原则和最佳实践。

相关留言评论
昵称:
邮箱:
阅读排行