Webhook是实现服务器间最终一致性事件通信的核心方案,无需依赖共享资源。但.NET原生缺乏相关工具支持。本文提供完整的Webhook发送实现方案(接续前文测试主题),涵盖从基础发送器到生产级队列系统的全流程实现。
可直接跳至文末查看 TLDR; 总结
dotnet new web
dotnet add package OneOf # 优雅的错误处理
dotnet add package System.Net.Http
dotnet add package Microsoft.Extensions.Http
public interface IWebhook
{
long Id { get; }
string Url { get; }
JsonDocument Body { get; }
}
public static class IWebhookMapper
{
public static HttpRequestMessage ToHttpRequestMessage(this IWebhook webhook)
{
return new HttpRequestMessage(HttpMethod.Post, webhook.Url)
{
Content = new StringContent(
webhook.Body.RootElement.GetRawText(),
Encoding.UTF8,
"application/json"
)
};
}
}
public class WebhookSender(HttpClient client, ILogger<WebhookSender> logger)
{
public async Task<OneOf<HttpResponseMessage, Exception>> Send(IWebhook record)
{
try
{
logger.LogDebug("发送Webhook {webhookId}", record.Id);
var request = record.ToHttpRequestMessage();
var response = await client.SendAsync(request);
logger.LogInformation("已发送Webhook {webhookId}", record.Id);
return response;
}
catch (Exception ex)
{
return ex;
}
}
}
public interface IWebhookStore<TRecord> where TRecord : IWebhook
{
Task RunTransactionWithPendingRecords(
Func<TRecord[], Task> changes,
CancellationToken cancellationToken
);
Task HandleResult(TRecord record, OneOf<HttpResponseMessage, Exception> result);
}
public class WebhookSendingIteration<TRecord>(
IWebhookStore<TRecord> store,
WebhookSender sender,
ILogger<WebhookSendingIteration<TRecord>> logger
) : IContinuousWorkIteration where TRecord : IWebhook
{
public async Task Run(CancellationToken stoppingToken)
{
logger.LogInformation("开始Webhook发送迭代");
await store.RunTransactionWithPendingRecords(async pendingRecords => {
await Task.WhenAll(pendingRecords.Select(async record => {
var result = await sender.Send(record);
await store.HandleResult(record, result);
}));
}, stoppingToken);
logger.LogInformation("Webhook发送迭代完成");
await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken);
}
}
public static class WebhookSendRegistration
{
public static IServiceCollection AddWebhookSending<TRecord>(
this IServiceCollection services,
Func<IServiceProvider, IWebhookStore<TRecord>> storeFactory
) where TRecord : IWebhook
{
services.AddHttpClient();
services.AddScoped<WebhookSender>();
services.AddScoped(storeFactory);
services.AddContinuousBackgroundService<WebhookSendingIteration<TRecord>>();
return services;
}
}
public class WebhookRecord : IWebhook
{
public long Id { get; set; }
public required string Url { get; set; }
public required JsonDocument Body { get; set; }
public string Status { get; set; } = WebhookStatus.Pending;
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public int? ResponseStatusCode { get; set; }
public JsonDocument? Response { get; set; }
}
public class WebhookStatus
{
public const string Pending = "Pending";
public const string Success = "Success";
public const string Error = "Error";
}
public class PostgresWebhookStore<TDb>(
TDb db,
Func<TDb, DbSet<WebhookRecord>> webhookRecordSetExtractor
) : IWebhookStore<WebhookRecord> where TDb : DbContext
{
public async Task RunTransactionWithPendingRecords(
Func<WebhookRecord[], Task> processing,
CancellationToken cancellationToken = default)
{
using var transaction = await db.Database.BeginTransactionAsync(cancellationToken);
var pendingForUpdate = await webhookRecordSetExtractor(db)
.FromSqlInterpolated($"""
SELECT *
FROM webhook_records
WHERE status = {WebhookStatus.Pending}
LIMIT 100
FOR UPDATE SKIP LOCKED -- 关键并发控制
""")
.ToArrayAsync();
await processing(pendingForUpdate);
await db.SaveChangesAsync(cancellationToken);
await transaction.CommitAsync(cancellationToken);
}
public async Task HandleResult(WebhookRecord record, OneOf<HttpResponseMessage, Exception> result)
{
// 处理结果逻辑(略)
}
}
public static class PostgresWebhookStoreRegistration
{
public static IServiceCollection AddPostgresWebhookSending<TDb>(
this IServiceCollection services,
Func<TDb, DbSet<WebhookRecord>> webhookRecordSetExtractor
) where TDb : DbContext
{
return services.AddWebhookSending(x =>
new PostgresWebhookStore<TDb>(
x.GetRequiredService<TDb>(),
webhookRecordSetExtractor
)
);
}
}
await app.Services.EnsureRecreated<Db>(async db => {
db.WebhookRecords.Add(new WebhookRecord() {
Url = "http://localhost:5195/webhooks/dump/from-record",
Body = JsonDocument.Parse("{\"example\": \"one\"}")
});
await db.SaveChangesAsync();
});
var builder = WebApplication.CreateBuilder(args);
builder.Logging.AddSimpleConsole(c => c.SingleLine = true);
builder.Services.AddPostgres<Db>();
builder.Services.AddPostgresWebhookSending<Db>();
var app = builder.Build();
// 初始化测试数据(略)
app.UseRequestBodyStringReader();
app.MapWebhookDump<Db>();
app.Run();
// 数据库上下文定义
public class Db(DbContextOptions<Db> options) : DbContext(options), IDbWith<WebhookRecord>
{
public DbSet<WebhookRecord> WebhookRecords { get; set; }
}
[
{
"id": 1,
"path": "/webhooks/dump/from-record",
"body": { "example": "one" },
"time": "2025-04-30T14:11:49.607198Z"
}
]
直接使用现成NuGet包:
dotnet add package Nist.Webhooks.Sender
// 1. 添加PostgreSQL支持
builder.Services.AddPostgres<Db>();
// 2. 启用Webhook发送服务
builder.Services.AddContinuousWebhookSending(sp =>
sp.GetRequiredService<Db>()
);
// 3. 定义数据库上下文
public class Db(DbContextOptions<Db> options) :
DbContext(options),
IDbWithWebhookRecord<WebhookRecord>
{
public DbSet<WebhookRecord> WebhookRecords { get; set; }
}
架构价值:
✅ 分布式队列处理
✅ 并发控制(FOR UPDATE SKIP LOCKED
)
✅ 开箱即用生产级方案
✅ 完整事务支持与错误处理