通过C#与.NET 9,释放响应式编程的威力!本文将深入探讨响应式扩展(Rx.NET)、事件驱动架构与异步流的实战应用,助你构建高效、可扩展的实时系统。无论你是新手还是专家,都能在此找到构建高响应性应用的完整方案。#CSharp #响应式编程
响应式系统(Reactive Systems)遵循响应式宣言原则,具备响应性(Responsive)、弹性(Resilient)、**可伸缩(Elastic)和消息驱动(Message-Driven)特性。在C#与.NET中,这类系统常通过响应式扩展(Rx.NET)**实现,高效处理异步数据流与实时数据处理。
响应性
弹性
可伸缩
消息驱动
Rx是一个通过可观察序列(Observable Sequences)与LINQ风格操作符构建异步事件驱动程序的库,是C#开发响应式系统的核心工具。
IObservable<T>
表示可观察的数据流或事件流,以推送模式随时间发射数据项。
IObserver<T>
表示订阅IObservable<T>
并响应数据的观察者。
操作符
提供Select
、Where
、Merge
、Throttle
等LINQ风格操作符,用于转换、过滤与组合数据流。
本案例展示如何用C#与.NET 9构建一个事件驱动、可伸缩的Web API,实时处理农产品市场价格数据流并提供分析洞察。
dotnet new webapi -n AgriMarketAnalysis # 创建Web API项目
cd AgriMarketAnalysis
dotnet add package System.Reactive # 安装Rx.NET
dotnet add package Microsoft.EntityFrameworkCore.SqlServer # 数据库支持
namespace AgriMarketAnalysis.Models
{
public class AgriculturalGood
{
public int Id { get; set; }
public string Name { get; set; } // 商品名称(如小麦、玉米)
public decimal Price { get; set; } // 当前市场价格
public DateTime Timestamp { get; set; }// 数据时间戳
public string Region { get; set; } // 市场区域
}
}
数据库上下文AppDbContext.cs
using Microsoft.EntityFrameworkCore;
namespace AgriMarketAnalysis.Data
{
public class AppDbContext : DbContext
{
public AppDbContext(DbContextOptions<AppDbContext> options) : base(options) { }
public DbSet<AgriculturalGood> AgriculturalGoods { get; set; }
}
}
数据库连接配置appsettings.json
{
"ConnectionStrings": {
"DefaultConnection": "Server=localhost;Database=AgriMarket;User Id=sa;Password=your_password;TrustServerCertificate=True;"
}
}
注册服务Program.cs
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddDbContext<AppDbContext>(options =>
options.UseSqlServer(builder.Configuration.GetConnectionString("DefaultConnection")));
builder.Services.AddSingleton<MarketDataService>(); // 注册数据服务
Services/MarketDataService.cs
using System.Reactive.Subjects;
using AgriMarketAnalysis.Models;
namespace AgriMarketAnalysis.Services
{
public class MarketDataService
{
private readonly Subject<AgriculturalGood> _marketDataStream = new();
private readonly AppDbContext _dbContext;
public MarketDataService(AppDbContext dbContext) => _dbContext = dbContext;
public IObservable<AgriculturalGood> MarketDataStream => _marketDataStream;
public void AddMarketData(AgriculturalGood good)
{
try
{
_dbContext.AgriculturalGoods.Add(good); // 保存到数据库
_dbContext.SaveChanges();
_marketDataStream.OnNext(good); // 推送至数据流
}
catch (Exception ex)
{
Console.WriteLine($"数据保存失败: {ex.Message}");
}
}
}
}
Controllers/MarketAnalysisController.cs
using Microsoft.AspNetCore.Mvc;
using System.Reactive.Linq;
namespace AgriMarketAnalysis.Controllers
{
[ApiController]
[Route("api/[controller]")]
public class MarketAnalysisController : ControllerBase
{
private readonly MarketDataService _marketDataService;
public MarketAnalysisController(MarketDataService marketDataService)
=> _marketDataService = marketDataService;
[HttpPost("add")]
public IActionResult AddMarketData([FromBody] AgriculturalGood good)
{
_marketDataService.AddMarketData(good);
return Ok();
}
[HttpGet("trends")]
public async Task<IActionResult> GetMarketTrends()
{
var trends = await _marketDataService.MarketDataStream
.Where(good => good.Timestamp >= DateTime.UtcNow.AddHours(-1)) // 近1小时数据
.GroupBy(good => good.Name)
.Select(group => new
{
Good = group.Key,
AveragePrice = group.Average(g => g.Price) // 计算均价
})
.ToList();
return Ok(trends);
}
}
}
dotnet run # 启动应用
/swagger
):
/api/marketanalysis/add
:添加市场数据。/api/marketanalysis/trends
:获取实时价格趋势。本文通过实战案例展示了如何在.NET 9中利用响应式编程构建高效、实时的农产品市场分析系统。通过Rx.NET与异步流技术,开发者能够轻松应对高并发场景,打造高性能应用。
源代码获取:公众号回复消息【code:41885
】