实战指南:Dapr + .NET 10 + PostgreSQL + Dapper 构建高效微服务后端

作者:微信公众号:【架构师老卢】
7-30 8:15
7

本文向您展示如何将 Dapr 集成到微服务应用程序中并管理服务间的通信。我们将以 POS(销售点)应用后端为例进行演示。

解决方案架构

POS 后端微服务
├── Dapr (用于分布式应用运行时)
├── PostgreSQL (数据存储)
|-- Redis (存储 Pub/Sub)
├── Dapper (微型 ORM)
└── .NET 10 (运行时)

实现步骤

1. 创建 .NET 10 项目

dotnet new webapi -n PosBackend -f net10.0
cd PosBackend
dotnet add package Dapr.AspNetCore
dotnet add package Dapper
dotnet add package Npgsql

dotnet new globaljson --sdk-version 10.0.100-preview.6.25358.103

2. 配置 Dapr 组件components 文件夹中创建以下文件:

pubsub.yaml

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: "localhost:6379"

如果未安装 PostgreSQL 数据库,请安装:

docker run -d --name pgdb  -e POSTGRES_USER=postgres  -e POSTGRES_PASSWORD=password  -e POSTGRES_DB=posdb  -p 5432:5432 postgres:15

statestore.yaml

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
spec:
  type: state.postgresql
  version: v1
  initTimeout: 30s  # 从默认的5秒增加
  metadata:
  - name: connectionString
    value: "host=localhost port=5432 user=hasan password=postgres dbname=posdb sslmode=disable"

3. 数据库设置 (PostgreSQL) SQL 模式 (init.sql)

CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    sku VARCHAR(50) UNIQUE NOT NULL,
    name VARCHAR(100) NOT NULL,
    description TEXT,
    price DECIMAL(10,2) NOT NULL,
    stock_quantity INT NOT NULL DEFAULT 0,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    order_number VARCHAR(20) UNIQUE NOT NULL,
    customer_id INT,
    total_amount DECIMAL(10,2) NOT NULL,
    status VARCHAR(20) DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE order_items (
    id SERIAL PRIMARY KEY,
    order_id INT REFERENCES orders(id),
    product_id INT REFERENCES products(id),
    quantity INT NOT NULL,
    unit_price DECIMAL(10,2) NOT NULL,
    total_price DECIMAL(10,2) NOT NULL
);

4. 实现领域模型 Models/Product.cs

namespace PosBackend.Models;

public class Product
{
    public int Id { get; set; }
    public string Sku { get; set; } = string.Empty;
    public string Name { get; set; } = string.Empty;
    public string? Description { get; set; }
    public decimal Price { get; set; }
    public int StockQuantity { get; set; }
    public DateTime CreatedAt { get; set; }
}

Models/Order.cs

namespace PosBackend.Models;

public class Order
{
    public int Id { get; set; }
    public string OrderNumber { get; set; } = string.Empty;
    public int? CustomerId { get; set; }
    public decimal TotalAmount { get; set; }
    public string Status { get; set; } = "pending";
    public DateTime CreatedAt { get; set; }
    public List<OrderItem> Items { get; set; } = new();
}

public class OrderItem
{
    public int Id { get; set; }
    public int OrderId { get; set; }
    public int ProductId { get; set; }
    public int Quantity { get; set; }
    public decimal UnitPrice { get; set; }
    public decimal TotalPrice { get; set; }
    public Product? Product { get; set; }
}

5. 使用 Dapper 创建仓储层 Repositories/IRepository.cs

namespace PosBackend.Repositories;

public interface IRepository<T>
{
    Task<IEnumerable<T>> GetAllAsync();
    Task<T?> GetByIdAsync(int id);
    Task<int> CreateAsync(T entity);
    Task<bool> UpdateAsync(T entity);
    Task<bool> DeleteAsync(int id);
}

Repositories/ProductRepository.cs

using Dapper;
using PosBackend.Models;
using System.Data;

namespace PosBackend.Repositories
{
    public class ProductRepository : IRepository<Product>
    {
        private readonly IDbConnection _db;

        public ProductRepository(IDbConnection db)
        {
            _db = db;

            // 确保连接已打开
            if (_db.State != ConnectionState.Open)
            {
                _db.Open();
            }
        }

        public async Task<IEnumerable<Product>> GetAllAsync()
        {
            return await _db.QueryAsync<Product>("SELECT * FROM products");
        }

        public async Task<Product?> GetByIdAsync(int id)
        {
            return await _db.QueryFirstOrDefaultAsync<Product>(
                "SELECT * FROM products WHERE id = @Id", new { Id = id });
        }

        public async Task<int> CreateAsync(Product product)
        {
            var sql = @"INSERT INTO products (sku, name, description, price, stock_quantity) 
                    VALUES (@Sku, @Name, @Description, @Price, @StockQuantity)
                    RETURNING id";
            return await _db.ExecuteScalarAsync<int>(sql, product);
        }

        public async Task<bool> UpdateAsync(Product product)
        {
            var affectedRows = await _db.ExecuteAsync(
                @"UPDATE products SET 
                sku = @Sku, 
                name = @Name, 
                description = @Description, 
                price = @Price, 
                stock_quantity = @StockQuantity 
            WHERE id = @Id", product);
            return affectedRows > 0;
        }

        public async Task<bool> DeleteAsync(int id)
        {
            var affectedRows = await _db.ExecuteAsync(
                "DELETE FROM products WHERE id = @Id", new { Id = id });
            return affectedRows > 0;
        }
    }
}

现在创建订单仓储 (OrderRepository):

using Dapper;
using PosBackend.Models;
using System.Data;

namespace PosBackend.Repositories
{
    public class OrdersRepository : IRepository<Order>
    {
        private readonly IDbConnection _db;

        public OrdersRepository(IDbConnection db)
        {
            _db = db;

            // 确保连接已打开
            if (_db.State != ConnectionState.Open)
            {
                _db.Open();
            }
        }

        public async Task<IEnumerable<Order>> GetAllAsync()
        {
            var orders = await _db.QueryAsync<Order>("SELECT * FROM orders");

            // 为每个订单加载订单项
            foreach (var order in orders)
            {
                order.Items = (await GetOrderItemsAsync(order.Id)).ToList();
            }

            return orders;
        }

        public async Task<Order?> GetByIdAsync(int id)
        {
            var order = await _db.QueryFirstOrDefaultAsync<Order>(
                "SELECT * FROM orders WHERE id = @Id", new { Id = id });

            if (order != null)
            {
                order.Items = (await GetOrderItemsAsync(order.Id)).ToList();
            }

            return order;
        }

        public async Task<Order?> GetByOrderNumberAsync(string orderNumber)
        {
            var order = await _db.QueryFirstOrDefaultAsync<Order>(
                "SELECT * FROM orders WHERE order_number = @OrderNumber",
                new { OrderNumber = orderNumber });

            if (order != null)
            {
                order.Items = (await GetOrderItemsAsync(order.Id)).ToList();
            }

            return order;
        }

        public async Task<int> CreateAsync(Order order)
        {
            using var transaction = _db.BeginTransaction();

            try
            {
                // 插入订单
                var sql = @"INSERT INTO orders 
                        (order_number, customer_id, total_amount, status) 
                        VALUES (@OrderNumber, @CustomerId, @TotalAmount, @Status)
                        RETURNING id";

                order.Id = await _db.ExecuteScalarAsync<int>(sql, order, transaction);

                // 插入订单项
                foreach (var item in order.Items)
                {
                    item.OrderId = order.Id;
                    await CreateOrderItemAsync(item, transaction);
                }

                transaction.Commit();
                return order.Id;
            }
            catch
            {
                transaction.Rollback();
                throw;
            }
        }

        public async Task<bool> UpdateAsync(Order order)
        {
            var sql = @"UPDATE orders SET 
                    customer_id = @CustomerId,
                    total_amount = @TotalAmount,
                    status = @Status
                WHERE id = @Id";

            var affectedRows = await _db.ExecuteAsync(sql, order);
            return affectedRows > 0;
        }

        public async Task<bool> DeleteAsync(int id)
        {
            using var transaction = _db.BeginTransaction();

            try
            {
                // 先删除订单项
                await _db.ExecuteAsync(
                    "DELETE FROM order_items WHERE order_id = @OrderId",
                    new { OrderId = id },
                    transaction);

                // 然后删除订单
                var affectedRows = await _db.ExecuteAsync(
                    "DELETE FROM orders WHERE id = @Id",
                    new { Id = id },
                    transaction);

                transaction.Commit();
                return affectedRows > 0;
            }
            catch
            {
                transaction.Rollback();
                throw;
            }
        }

        public async Task<bool> UpdateOrderStatusAsync(int orderId, string status)
        {
            var affectedRows = await _db.ExecuteAsync(
                "UPDATE orders SET status = @Status WHERE id = @OrderId",
                new { OrderId = orderId, Status = status });

            return affectedRows > 0;
        }

        private async Task<IEnumerable<OrderItem>> GetOrderItemsAsync(int orderId)
        {
            var sql = @"SELECT oi.*, p.* 
                    FROM order_items oi
                    LEFT JOIN products p ON oi.product_id = p.id
                    WHERE oi.order_id = @OrderId";

            return await _db.QueryAsync<OrderItem, Product, OrderItem>(sql,
                (orderItem, product) =>
                {
                    orderItem.Product = product;
                    return orderItem;
                },
                new { OrderId = orderId },
                splitOn: "id");
        }

        private async Task CreateOrderItemAsync(OrderItem item, IDbTransaction transaction)
        {
            var sql = @"INSERT INTO order_items 
                    (order_id, product_id, quantity, unit_price, total_price)
                    VALUES (@OrderId, @ProductId, @Quantity, @UnitPrice, @TotalPrice)";

            await _db.ExecuteAsync(sql, item, transaction);
        }
    }
}

6. 使用 Dapr 状态管理实现服务层 Services/PosService.cs

using Dapr.Client;
using PosBackend.Models;
using PosBackend.Repositories;

namespace PosBackend.Services
{
    public class PosService
    {
        private readonly OrdersRepository _ordersRepo;
        private readonly ProductRepository _productRepo;
        private readonly DaprClient _daprClient;
        private const string StoreName = "statestore";

        public PosService(OrdersRepository ordersRepo, ProductRepository productRepo, DaprClient daprClient)
        {
            _ordersRepo = ordersRepo;
            _productRepo = productRepo;
            _daprClient = daprClient;
        }

        // 产品操作
        public async Task<IEnumerable<Product>> GetProductsAsync() =>
            await _productRepo.GetAllAsync();

        public async Task<Product?> GetProductAsync(int id) =>
            await _productRepo.GetByIdAsync(id);

        public async Task<Product> CreateProductAsync(Product product)
        {
            product.Id = await _productRepo.CreateAsync(product);
            return product;
        }

        // 使用 Dapr 状态管理的订单操作
        public async Task<Order?> GetOrderAsync(string orderId)
        {
            return await _daprClient.GetStateAsync<Order>(StoreName, orderId);
        }

        public async Task<Order> CreateOrderAsync(Order order)
        {
            // 生成订单号
            order.OrderNumber = $"ORD-{DateTime.UtcNow:yyyyMMdd-HHmmss}-{Guid.NewGuid().ToString()[..4]}";
            order.CreatedAt = DateTime.UtcNow;

            // 计算总额
            order.TotalAmount = order.Items.Sum(i => i.TotalPrice);

            // 保存到 Dapr 状态存储
            Console.WriteLine($"保存到 Dapr 状态存储 {order.OrderNumber}");

            await _daprClient.SaveStateAsync(StoreName, order.OrderNumber, order);

            // 发布订单创建事件
            Console.WriteLine($"发布订单创建事件 {order.OrderNumber}");
            await _daprClient.PublishEventAsync("pubsub", "orderCreated", order);

            return order;
        }

        public async Task<bool> UpdateOrderStatusAsync(string orderNumber, string status)
        {
            var order = await GetOrderAsync(orderNumber);
            if (order == null) return false;

            order.Status = status;
            await _daprClient.SaveStateAsync(StoreName, orderNumber, order);

            // 发布状态更新事件
            await _daprClient.PublishEventAsync("pubsub", "orderStatusUpdated",
                new { OrderNumber = orderNumber, Status = status });

            return true;
        }

        public async Task<bool> SaveOrderAsync(Order order)
        {
            bool opstat = false;

            try
            {
                int result = await _ordersRepo.CreateAsync(order);

                if (result > 0)
                {
                    opstat = true;
                }

            }
            catch (Exception ex)
            {
                Console.WriteLine($"保存订单时出错: {ex.Message}");
                // 根据需要处理日志记录或重新抛出
            }

            return opstat;
        }

        public async Task<bool> UpdateOrderAsync(Order order)
        {
            bool opstat = false;

            await _ordersRepo.UpdateAsync(order);

            return opstat;
        }
    }
}

7. 创建 API 控制器 Controllers/ProductsController.cs

using Microsoft.AspNetCore.Mvc;
using PosBackend.Models;
using PosBackend.Services;

namespace PosBackend.Controllers;

[ApiController]
[Route("[controller]")]
public class ProductsController : ControllerBase
{
    private readonly PosService _posService;

    public ProductsController(PosService posService)
    {
        _posService = posService;
    }

    [HttpGet]
    public async Task<ActionResult<IEnumerable<Product>>> GetProducts()
    {
        var products = await _posService.GetProductsAsync();
        return Ok(products);
    }

    [HttpGet("{id}")]
    public async Task<ActionResult<Product>> GetProduct(int id)
    {
        var product = await _posService.GetProductAsync(id);
        return product == null ? NotFound() : Ok(product);
    }

    [HttpPost]
    public async Task<ActionResult<Product>> CreateProduct(Product product)
    {
        var createdProduct = await _posService.CreateProductAsync(product);
        return CreatedAtAction(nameof(GetProduct), new { id = createdProduct.Id }, createdProduct);
    }
}

Controllers/OrdersController.cs

using Dapr;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using PosBackend.Models;
using PosBackend.Repositories;
using PosBackend.Services;

namespace PosBackend.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class OrdersController : ControllerBase
    {
        private readonly PosService _posService;

        public OrdersController(PosService posService)
        {
            _posService = posService;
        }

        [HttpGet("{orderNumber}")]
        public async Task<ActionResult<Order>> GetOrder(string orderNumber)
        {
            var order = await _posService.GetOrderAsync(orderNumber);
            return order == null ? NotFound() : Ok(order);
        }

        [HttpPost]
        public async Task<ActionResult<Order>> CreateOrder(Order order)
        {
            var createdOrder = await _posService.CreateOrderAsync(order);

            return CreatedAtAction(nameof(GetOrder), new { orderNumber = createdOrder.OrderNumber }, createdOrder);
        }

        [HttpPatch("{orderNumber}/status")]
        public async Task<IActionResult> UpdateOrderStatus(string orderNumber, [FromBody] string status)
        {
            var success = await _posService.UpdateOrderStatusAsync(orderNumber, status);
            return success ? NoContent() : NotFound();
        }

        [Topic("pubsub", "orderCreated")] // Dapr 订阅
        [HttpPost("orderCreated")]
        public async Task<IActionResult> HandleOrderCreateEvent([FromBody] Order order)
        {
            Console.WriteLine($"HandleOrderCreateEvent 处理订单创建事件 {order.Id}");

            if (order == null || order.Id <= 0)
            {
                //logger.LogWarning("收到无效订单");
                return BadRequest("无效的订单数据");
            }

            // 3. 保存到数据库
            await _posService.SaveOrderAsync(order);

            return Ok();
        }

        [Topic("pubsub", "orderStatusUpdated")] // Dapr 订阅
        [HttpPost("orderStatusUpdated")]
        public async Task<IActionResult> HandleOrderUpdateEvent([FromBody] Order order)
        {
            Console.WriteLine($"HandleOrderUpdateEvent 处理订单状态更新事件 {order.Id}");

            if (order == null || order.Id <= 0) // 原文为 `order.Id > 0`,逻辑上应为 `<=0` 表示无效ID
            {
                //logger.LogWarning("收到无效订单");
                return BadRequest("无效的订单数据");
            }

            // 3. 更新数据库中的订单
            await _posService.UpdateOrderAsync(order);

            return Ok();
        }
    }
}

8. 配置依赖注入 Program.cs

using Npgsql;
using PosBackend.Repositories;
using PosBackend.Services;
using System.Data;

var builder = WebApplication.CreateBuilder(args);

// 添加服务到容器
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

builder.Services.AddControllers().AddDapr(); // 添加 Dapr 支持
builder.Services.AddDaprClient(builder => builder
    .UseHttpEndpoint($"http://localhost:3500")
    .UseGrpcEndpoint($"http://localhost:50001"));

// 配置 PostgreSQL 连接
var connectionString = builder.Configuration.GetConnectionString("PostgreSQL");
builder.Services.AddScoped<IDbConnection>(_ =>
{
    var connection = new NpgsqlConnection(connectionString);
    connection.Open();  // 显式打开连接
    return connection;
});

// 注册仓储
builder.Services.AddScoped<OrdersRepository>();
builder.Services.AddScoped<ProductRepository>();
// 注册服务
builder.Services.AddScoped<PosService>();

var app = builder.Build();

// 配置 HTTP 请求管道
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}
app.UseSwagger();
app.UseSwaggerUI();

app.UseAuthorization();
app.MapControllers();

// 启用 Dapr 发布/订阅
app.UseCloudEvents();
app.MapSubscribeHandler(); // 映射 Dapr 订阅端点

app.Run();

9. 添加配置 appsettings.json

{
  "ConnectionStrings": {
    "PostgreSQL": "User ID=postgres;Password=password;Host=localhost;Port=5432;Database=posdb;"
  },
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "AllowedHosts": "*"
}

部署与运行

  1. 启动 PostgreSQL 并创建数据库 (参考步骤2中的docker命令)。
  2. 使用 Dapr sidecar 运行应用程序:
    # dapr run --app-id pos-backend --app-port 5000 --dapr-http-port 3500 --components-path ./components dotnet run
    # 示例命令(端口号可能需要调整)
    dapr run --app-id PosBackend --app-port 5194 --components-path ./components --enable-profiling=false -- dotnet run
    

感谢阅读我的文章,现在您已准备好构建下一代高性能企业解决方案,本文就是您的起点。我们展示了 Dapr 如何简化微服务通信和状态管理,Dapper 如何提供快速轻量级的数据库访问,以及 PostgreSQL 与 .NET 10 如何利用现代、可扩展的工具赋能您的后端。无论您是为了速度、可扩展性还是简单性进行架构设计——本指南都奠定了坚实的基础。让我们携手构建更智能的微服务。

相关留言评论
昵称:
邮箱:
阅读排行