解锁.NET Webhook终极方案:基于PostgreSQL队列的可靠发送架构

作者:微信公众号:【架构师老卢】
7-20 19:10
13

Webhook是实现服务器间最终一致性事件通信的核心方案,无需依赖共享资源。但.NET原生缺乏相关工具支持。本文提供完整的Webhook发送实现方案(接续前文测试主题),涵盖从基础发送器到生产级队列系统的全流程实现。

可直接跳至文末查看 TLDR; 总结


🔧 构建Webhook发送器

初始化项目与依赖

dotnet new web
dotnet add package OneOf          # 优雅的错误处理
dotnet add package System.Net.Http
dotnet add package Microsoft.Extensions.Http

定义Webhook模型与转换逻辑

public interface IWebhook
{
    long Id { get; }
    string Url { get; }
    JsonDocument Body { get; }
}

public static class IWebhookMapper
{
    public static HttpRequestMessage ToHttpRequestMessage(this IWebhook webhook)
    {
        return new HttpRequestMessage(HttpMethod.Post, webhook.Url)
        {
            Content = new StringContent(
                webhook.Body.RootElement.GetRawText(), 
                Encoding.UTF8, 
                "application/json"
            )
        };
    }
}

核心发送器实现

public class WebhookSender(HttpClient client, ILogger<WebhookSender> logger)
{
    public async Task<OneOf<HttpResponseMessage, Exception>> Send(IWebhook record)
    {
        try
        {
            logger.LogDebug("发送Webhook {webhookId}", record.Id);
            var request = record.ToHttpRequestMessage();
            var response = await client.SendAsync(request);
            logger.LogInformation("已发送Webhook {webhookId}", record.Id);
            return response;
        }
        catch (Exception ex)
        {
            return ex;
        }
    }
}

🔁 构建持续发送迭代系统

存储接口定义

public interface IWebhookStore<TRecord> where TRecord : IWebhook
{
    Task RunTransactionWithPendingRecords(
        Func<TRecord[], Task> changes, 
        CancellationToken cancellationToken
    );
    Task HandleResult(TRecord record, OneOf<HttpResponseMessage, Exception> result);
}

持续迭代处理器

public class WebhookSendingIteration<TRecord>(
    IWebhookStore<TRecord> store, 
    WebhookSender sender, 
    ILogger<WebhookSendingIteration<TRecord>> logger
) : IContinuousWorkIteration where TRecord : IWebhook
{
    public async Task Run(CancellationToken stoppingToken)
    {
        logger.LogInformation("开始Webhook发送迭代");
        
        await store.RunTransactionWithPendingRecords(async pendingRecords => {
            await Task.WhenAll(pendingRecords.Select(async record => {
                var result = await sender.Send(record);
                await store.HandleResult(record, result);
            }));
        }, stoppingToken);
        
        logger.LogInformation("Webhook发送迭代完成");
        await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken);
    }
}

DI容器注册扩展

public static class WebhookSendRegistration
{
    public static IServiceCollection AddWebhookSending<TRecord>(
        this IServiceCollection services, 
        Func<IServiceProvider, IWebhookStore<TRecord>> storeFactory
    ) where TRecord : IWebhook
    {
        services.AddHttpClient();
        services.AddScoped<WebhookSender>();
        services.AddScoped(storeFactory);
        services.AddContinuousBackgroundService<WebhookSendingIteration<TRecord>>();
        return services;
    }
}

🗃️ PostgreSQL存储层实现

数据实体定义

public class WebhookRecord : IWebhook
{
    public long Id { get; set; }
    public required string Url { get; set; }
    public required JsonDocument Body { get; set; }
    public string Status { get; set; } = WebhookStatus.Pending;
    public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
    public int? ResponseStatusCode { get; set; }
    public JsonDocument? Response { get; set; }
}

public class WebhookStatus
{
    public const string Pending = "Pending";
    public const string Success = "Success";
    public const string Error = "Error";
}

高并发存储实现

public class PostgresWebhookStore<TDb>(
    TDb db, 
    Func<TDb, DbSet<WebhookRecord>> webhookRecordSetExtractor
) : IWebhookStore<WebhookRecord> where TDb : DbContext
{
    public async Task RunTransactionWithPendingRecords(
        Func<WebhookRecord[], Task> processing, 
        CancellationToken cancellationToken = default)
    {
        using var transaction = await db.Database.BeginTransactionAsync(cancellationToken);
        var pendingForUpdate = await webhookRecordSetExtractor(db)
            .FromSqlInterpolated($"""
                SELECT * 
                FROM webhook_records 
                WHERE status = {WebhookStatus.Pending}
                LIMIT 100
                FOR UPDATE SKIP LOCKED  -- 关键并发控制
                """)
            .ToArrayAsync();
        
        await processing(pendingForUpdate);
        await db.SaveChangesAsync(cancellationToken);
        await transaction.CommitAsync(cancellationToken);
    }

    public async Task HandleResult(WebhookRecord record, OneOf<HttpResponseMessage, Exception> result)
    {
        // 处理结果逻辑(略)
    }
}

便捷注册方法

public static class PostgresWebhookStoreRegistration
{
    public static IServiceCollection AddPostgresWebhookSending<TDb>(
        this IServiceCollection services, 
        Func<TDb, DbSet<WebhookRecord>> webhookRecordSetExtractor
    ) where TDb : DbContext
    {
        return services.AddWebhookSending(x => 
            new PostgresWebhookStore<TDb>(
                x.GetRequiredService<TDb>(), 
                webhookRecordSetExtractor
            )
        );
    }
}

🧪 端到端测试方案

测试数据初始化

await app.Services.EnsureRecreated<Db>(async db => {
    db.WebhookRecords.Add(new WebhookRecord() {
        Url = "http://localhost:5195/webhooks/dump/from-record",
        Body = JsonDocument.Parse("{\"example\": \"one\"}")
    });
    await db.SaveChangesAsync();
});

完整Program.cs配置

var builder = WebApplication.CreateBuilder(args);
builder.Logging.AddSimpleConsole(c => c.SingleLine = true);
        
builder.Services.AddPostgres<Db>();
builder.Services.AddPostgresWebhookSending<Db>();

var app = builder.Build();

// 初始化测试数据(略)

app.UseRequestBodyStringReader();
app.MapWebhookDump<Db>();
app.Run();

// 数据库上下文定义
public class Db(DbContextOptions<Db> options) : DbContext(options), IDbWith<WebhookRecord>
{
    public DbSet<WebhookRecord> WebhookRecords { get; set; }
}

验证输出示例

[
  {
    "id": 1,
    "path": "/webhooks/dump/from-record",
    "body": { "example": "one" },
    "time": "2025-04-30T14:11:49.607198Z"
  }
]

🚀 TLDR; 开箱即用方案

直接使用现成NuGet包:

dotnet add package Nist.Webhooks.Sender

三行代码集成

// 1. 添加PostgreSQL支持
builder.Services.AddPostgres<Db>(); 

// 2. 启用Webhook发送服务
builder.Services.AddContinuousWebhookSending(sp => 
    sp.GetRequiredService<Db>()
); 

// 3. 定义数据库上下文
public class Db(DbContextOptions<Db> options) : 
    DbContext(options), 
    IDbWithWebhookRecord<WebhookRecord> 
{
    public DbSet<WebhookRecord> WebhookRecords { get; set; }
}

架构价值
✅ 分布式队列处理
✅ 并发控制(FOR UPDATE SKIP LOCKED
✅ 开箱即用生产级方案
✅ 完整事务支持与错误处理

相关留言评论
昵称:
邮箱:
阅读排行