本文向您展示如何将 Dapr 集成到微服务应用程序中并管理服务间的通信。我们将以 POS(销售点)应用后端为例进行演示。
解决方案架构
POS 后端微服务
├── Dapr (用于分布式应用运行时)
├── PostgreSQL (数据存储)
|-- Redis (存储 Pub/Sub)
├── Dapper (微型 ORM)
└── .NET 10 (运行时)
实现步骤
1. 创建 .NET 10 项目
dotnet new webapi -n PosBackend -f net10.0
cd PosBackend
dotnet add package Dapr.AspNetCore
dotnet add package Dapper
dotnet add package Npgsql
dotnet new globaljson --sdk-version 10.0.100-preview.6.25358.103
2. 配置 Dapr 组件
在 components
文件夹中创建以下文件:
pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: "localhost:6379"
如果未安装 PostgreSQL 数据库,请安装:
docker run -d --name pgdb -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=password -e POSTGRES_DB=posdb -p 5432:5432 postgres:15
statestore.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.postgresql
version: v1
initTimeout: 30s # 从默认的5秒增加
metadata:
- name: connectionString
value: "host=localhost port=5432 user=hasan password=postgres dbname=posdb sslmode=disable"
3. 数据库设置 (PostgreSQL) SQL 模式 (init.sql)
CREATE TABLE products (
id SERIAL PRIMARY KEY,
sku VARCHAR(50) UNIQUE NOT NULL,
name VARCHAR(100) NOT NULL,
description TEXT,
price DECIMAL(10,2) NOT NULL,
stock_quantity INT NOT NULL DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
order_number VARCHAR(20) UNIQUE NOT NULL,
customer_id INT,
total_amount DECIMAL(10,2) NOT NULL,
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE order_items (
id SERIAL PRIMARY KEY,
order_id INT REFERENCES orders(id),
product_id INT REFERENCES products(id),
quantity INT NOT NULL,
unit_price DECIMAL(10,2) NOT NULL,
total_price DECIMAL(10,2) NOT NULL
);
4. 实现领域模型 Models/Product.cs
namespace PosBackend.Models;
public class Product
{
public int Id { get; set; }
public string Sku { get; set; } = string.Empty;
public string Name { get; set; } = string.Empty;
public string? Description { get; set; }
public decimal Price { get; set; }
public int StockQuantity { get; set; }
public DateTime CreatedAt { get; set; }
}
Models/Order.cs
namespace PosBackend.Models;
public class Order
{
public int Id { get; set; }
public string OrderNumber { get; set; } = string.Empty;
public int? CustomerId { get; set; }
public decimal TotalAmount { get; set; }
public string Status { get; set; } = "pending";
public DateTime CreatedAt { get; set; }
public List<OrderItem> Items { get; set; } = new();
}
public class OrderItem
{
public int Id { get; set; }
public int OrderId { get; set; }
public int ProductId { get; set; }
public int Quantity { get; set; }
public decimal UnitPrice { get; set; }
public decimal TotalPrice { get; set; }
public Product? Product { get; set; }
}
5. 使用 Dapper 创建仓储层 Repositories/IRepository.cs
namespace PosBackend.Repositories;
public interface IRepository<T>
{
Task<IEnumerable<T>> GetAllAsync();
Task<T?> GetByIdAsync(int id);
Task<int> CreateAsync(T entity);
Task<bool> UpdateAsync(T entity);
Task<bool> DeleteAsync(int id);
}
Repositories/ProductRepository.cs
using Dapper;
using PosBackend.Models;
using System.Data;
namespace PosBackend.Repositories
{
public class ProductRepository : IRepository<Product>
{
private readonly IDbConnection _db;
public ProductRepository(IDbConnection db)
{
_db = db;
// 确保连接已打开
if (_db.State != ConnectionState.Open)
{
_db.Open();
}
}
public async Task<IEnumerable<Product>> GetAllAsync()
{
return await _db.QueryAsync<Product>("SELECT * FROM products");
}
public async Task<Product?> GetByIdAsync(int id)
{
return await _db.QueryFirstOrDefaultAsync<Product>(
"SELECT * FROM products WHERE id = @Id", new { Id = id });
}
public async Task<int> CreateAsync(Product product)
{
var sql = @"INSERT INTO products (sku, name, description, price, stock_quantity)
VALUES (@Sku, @Name, @Description, @Price, @StockQuantity)
RETURNING id";
return await _db.ExecuteScalarAsync<int>(sql, product);
}
public async Task<bool> UpdateAsync(Product product)
{
var affectedRows = await _db.ExecuteAsync(
@"UPDATE products SET
sku = @Sku,
name = @Name,
description = @Description,
price = @Price,
stock_quantity = @StockQuantity
WHERE id = @Id", product);
return affectedRows > 0;
}
public async Task<bool> DeleteAsync(int id)
{
var affectedRows = await _db.ExecuteAsync(
"DELETE FROM products WHERE id = @Id", new { Id = id });
return affectedRows > 0;
}
}
}
现在创建订单仓储 (OrderRepository
):
using Dapper;
using PosBackend.Models;
using System.Data;
namespace PosBackend.Repositories
{
public class OrdersRepository : IRepository<Order>
{
private readonly IDbConnection _db;
public OrdersRepository(IDbConnection db)
{
_db = db;
// 确保连接已打开
if (_db.State != ConnectionState.Open)
{
_db.Open();
}
}
public async Task<IEnumerable<Order>> GetAllAsync()
{
var orders = await _db.QueryAsync<Order>("SELECT * FROM orders");
// 为每个订单加载订单项
foreach (var order in orders)
{
order.Items = (await GetOrderItemsAsync(order.Id)).ToList();
}
return orders;
}
public async Task<Order?> GetByIdAsync(int id)
{
var order = await _db.QueryFirstOrDefaultAsync<Order>(
"SELECT * FROM orders WHERE id = @Id", new { Id = id });
if (order != null)
{
order.Items = (await GetOrderItemsAsync(order.Id)).ToList();
}
return order;
}
public async Task<Order?> GetByOrderNumberAsync(string orderNumber)
{
var order = await _db.QueryFirstOrDefaultAsync<Order>(
"SELECT * FROM orders WHERE order_number = @OrderNumber",
new { OrderNumber = orderNumber });
if (order != null)
{
order.Items = (await GetOrderItemsAsync(order.Id)).ToList();
}
return order;
}
public async Task<int> CreateAsync(Order order)
{
using var transaction = _db.BeginTransaction();
try
{
// 插入订单
var sql = @"INSERT INTO orders
(order_number, customer_id, total_amount, status)
VALUES (@OrderNumber, @CustomerId, @TotalAmount, @Status)
RETURNING id";
order.Id = await _db.ExecuteScalarAsync<int>(sql, order, transaction);
// 插入订单项
foreach (var item in order.Items)
{
item.OrderId = order.Id;
await CreateOrderItemAsync(item, transaction);
}
transaction.Commit();
return order.Id;
}
catch
{
transaction.Rollback();
throw;
}
}
public async Task<bool> UpdateAsync(Order order)
{
var sql = @"UPDATE orders SET
customer_id = @CustomerId,
total_amount = @TotalAmount,
status = @Status
WHERE id = @Id";
var affectedRows = await _db.ExecuteAsync(sql, order);
return affectedRows > 0;
}
public async Task<bool> DeleteAsync(int id)
{
using var transaction = _db.BeginTransaction();
try
{
// 先删除订单项
await _db.ExecuteAsync(
"DELETE FROM order_items WHERE order_id = @OrderId",
new { OrderId = id },
transaction);
// 然后删除订单
var affectedRows = await _db.ExecuteAsync(
"DELETE FROM orders WHERE id = @Id",
new { Id = id },
transaction);
transaction.Commit();
return affectedRows > 0;
}
catch
{
transaction.Rollback();
throw;
}
}
public async Task<bool> UpdateOrderStatusAsync(int orderId, string status)
{
var affectedRows = await _db.ExecuteAsync(
"UPDATE orders SET status = @Status WHERE id = @OrderId",
new { OrderId = orderId, Status = status });
return affectedRows > 0;
}
private async Task<IEnumerable<OrderItem>> GetOrderItemsAsync(int orderId)
{
var sql = @"SELECT oi.*, p.*
FROM order_items oi
LEFT JOIN products p ON oi.product_id = p.id
WHERE oi.order_id = @OrderId";
return await _db.QueryAsync<OrderItem, Product, OrderItem>(sql,
(orderItem, product) =>
{
orderItem.Product = product;
return orderItem;
},
new { OrderId = orderId },
splitOn: "id");
}
private async Task CreateOrderItemAsync(OrderItem item, IDbTransaction transaction)
{
var sql = @"INSERT INTO order_items
(order_id, product_id, quantity, unit_price, total_price)
VALUES (@OrderId, @ProductId, @Quantity, @UnitPrice, @TotalPrice)";
await _db.ExecuteAsync(sql, item, transaction);
}
}
}
6. 使用 Dapr 状态管理实现服务层 Services/PosService.cs
using Dapr.Client;
using PosBackend.Models;
using PosBackend.Repositories;
namespace PosBackend.Services
{
public class PosService
{
private readonly OrdersRepository _ordersRepo;
private readonly ProductRepository _productRepo;
private readonly DaprClient _daprClient;
private const string StoreName = "statestore";
public PosService(OrdersRepository ordersRepo, ProductRepository productRepo, DaprClient daprClient)
{
_ordersRepo = ordersRepo;
_productRepo = productRepo;
_daprClient = daprClient;
}
// 产品操作
public async Task<IEnumerable<Product>> GetProductsAsync() =>
await _productRepo.GetAllAsync();
public async Task<Product?> GetProductAsync(int id) =>
await _productRepo.GetByIdAsync(id);
public async Task<Product> CreateProductAsync(Product product)
{
product.Id = await _productRepo.CreateAsync(product);
return product;
}
// 使用 Dapr 状态管理的订单操作
public async Task<Order?> GetOrderAsync(string orderId)
{
return await _daprClient.GetStateAsync<Order>(StoreName, orderId);
}
public async Task<Order> CreateOrderAsync(Order order)
{
// 生成订单号
order.OrderNumber = $"ORD-{DateTime.UtcNow:yyyyMMdd-HHmmss}-{Guid.NewGuid().ToString()[..4]}";
order.CreatedAt = DateTime.UtcNow;
// 计算总额
order.TotalAmount = order.Items.Sum(i => i.TotalPrice);
// 保存到 Dapr 状态存储
Console.WriteLine($"保存到 Dapr 状态存储 {order.OrderNumber}");
await _daprClient.SaveStateAsync(StoreName, order.OrderNumber, order);
// 发布订单创建事件
Console.WriteLine($"发布订单创建事件 {order.OrderNumber}");
await _daprClient.PublishEventAsync("pubsub", "orderCreated", order);
return order;
}
public async Task<bool> UpdateOrderStatusAsync(string orderNumber, string status)
{
var order = await GetOrderAsync(orderNumber);
if (order == null) return false;
order.Status = status;
await _daprClient.SaveStateAsync(StoreName, orderNumber, order);
// 发布状态更新事件
await _daprClient.PublishEventAsync("pubsub", "orderStatusUpdated",
new { OrderNumber = orderNumber, Status = status });
return true;
}
public async Task<bool> SaveOrderAsync(Order order)
{
bool opstat = false;
try
{
int result = await _ordersRepo.CreateAsync(order);
if (result > 0)
{
opstat = true;
}
}
catch (Exception ex)
{
Console.WriteLine($"保存订单时出错: {ex.Message}");
// 根据需要处理日志记录或重新抛出
}
return opstat;
}
public async Task<bool> UpdateOrderAsync(Order order)
{
bool opstat = false;
await _ordersRepo.UpdateAsync(order);
return opstat;
}
}
}
7. 创建 API 控制器 Controllers/ProductsController.cs
using Microsoft.AspNetCore.Mvc;
using PosBackend.Models;
using PosBackend.Services;
namespace PosBackend.Controllers;
[ApiController]
[Route("[controller]")]
public class ProductsController : ControllerBase
{
private readonly PosService _posService;
public ProductsController(PosService posService)
{
_posService = posService;
}
[HttpGet]
public async Task<ActionResult<IEnumerable<Product>>> GetProducts()
{
var products = await _posService.GetProductsAsync();
return Ok(products);
}
[HttpGet("{id}")]
public async Task<ActionResult<Product>> GetProduct(int id)
{
var product = await _posService.GetProductAsync(id);
return product == null ? NotFound() : Ok(product);
}
[HttpPost]
public async Task<ActionResult<Product>> CreateProduct(Product product)
{
var createdProduct = await _posService.CreateProductAsync(product);
return CreatedAtAction(nameof(GetProduct), new { id = createdProduct.Id }, createdProduct);
}
}
Controllers/OrdersController.cs
using Dapr;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using PosBackend.Models;
using PosBackend.Repositories;
using PosBackend.Services;
namespace PosBackend.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class OrdersController : ControllerBase
{
private readonly PosService _posService;
public OrdersController(PosService posService)
{
_posService = posService;
}
[HttpGet("{orderNumber}")]
public async Task<ActionResult<Order>> GetOrder(string orderNumber)
{
var order = await _posService.GetOrderAsync(orderNumber);
return order == null ? NotFound() : Ok(order);
}
[HttpPost]
public async Task<ActionResult<Order>> CreateOrder(Order order)
{
var createdOrder = await _posService.CreateOrderAsync(order);
return CreatedAtAction(nameof(GetOrder), new { orderNumber = createdOrder.OrderNumber }, createdOrder);
}
[HttpPatch("{orderNumber}/status")]
public async Task<IActionResult> UpdateOrderStatus(string orderNumber, [FromBody] string status)
{
var success = await _posService.UpdateOrderStatusAsync(orderNumber, status);
return success ? NoContent() : NotFound();
}
[Topic("pubsub", "orderCreated")] // Dapr 订阅
[HttpPost("orderCreated")]
public async Task<IActionResult> HandleOrderCreateEvent([FromBody] Order order)
{
Console.WriteLine($"HandleOrderCreateEvent 处理订单创建事件 {order.Id}");
if (order == null || order.Id <= 0)
{
//logger.LogWarning("收到无效订单");
return BadRequest("无效的订单数据");
}
// 3. 保存到数据库
await _posService.SaveOrderAsync(order);
return Ok();
}
[Topic("pubsub", "orderStatusUpdated")] // Dapr 订阅
[HttpPost("orderStatusUpdated")]
public async Task<IActionResult> HandleOrderUpdateEvent([FromBody] Order order)
{
Console.WriteLine($"HandleOrderUpdateEvent 处理订单状态更新事件 {order.Id}");
if (order == null || order.Id <= 0) // 原文为 `order.Id > 0`,逻辑上应为 `<=0` 表示无效ID
{
//logger.LogWarning("收到无效订单");
return BadRequest("无效的订单数据");
}
// 3. 更新数据库中的订单
await _posService.UpdateOrderAsync(order);
return Ok();
}
}
}
8. 配置依赖注入 Program.cs
using Npgsql;
using PosBackend.Repositories;
using PosBackend.Services;
using System.Data;
var builder = WebApplication.CreateBuilder(args);
// 添加服务到容器
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddControllers().AddDapr(); // 添加 Dapr 支持
builder.Services.AddDaprClient(builder => builder
.UseHttpEndpoint($"http://localhost:3500")
.UseGrpcEndpoint($"http://localhost:50001"));
// 配置 PostgreSQL 连接
var connectionString = builder.Configuration.GetConnectionString("PostgreSQL");
builder.Services.AddScoped<IDbConnection>(_ =>
{
var connection = new NpgsqlConnection(connectionString);
connection.Open(); // 显式打开连接
return connection;
});
// 注册仓储
builder.Services.AddScoped<OrdersRepository>();
builder.Services.AddScoped<ProductRepository>();
// 注册服务
builder.Services.AddScoped<PosService>();
var app = builder.Build();
// 配置 HTTP 请求管道
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseSwagger();
app.UseSwaggerUI();
app.UseAuthorization();
app.MapControllers();
// 启用 Dapr 发布/订阅
app.UseCloudEvents();
app.MapSubscribeHandler(); // 映射 Dapr 订阅端点
app.Run();
9. 添加配置 appsettings.json
{
"ConnectionStrings": {
"PostgreSQL": "User ID=postgres;Password=password;Host=localhost;Port=5432;Database=posdb;"
},
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*"
}
部署与运行
# dapr run --app-id pos-backend --app-port 5000 --dapr-http-port 3500 --components-path ./components dotnet run
# 示例命令(端口号可能需要调整)
dapr run --app-id PosBackend --app-port 5194 --components-path ./components --enable-profiling=false -- dotnet run
感谢阅读我的文章,现在您已准备好构建下一代高性能企业解决方案,本文就是您的起点。我们展示了 Dapr 如何简化微服务通信和状态管理,Dapper 如何提供快速轻量级的数据库访问,以及 PostgreSQL 与 .NET 10 如何利用现代、可扩展的工具赋能您的后端。无论您是为了速度、可扩展性还是简单性进行架构设计——本指南都奠定了坚实的基础。让我们携手构建更智能的微服务。