C# 中的并发性:使用 BlockingCollection 实现生产者-消费者模式

作者:微信公众号:【架构师老卢】
2-18 19:50
222

概述:介绍在快节奏的现代软件开发领域,并发是一个基石概念。并发支持同时执行多个任务,从而优化程序性能和响应能力。由于能够跨线程或进程分配工作,与顺序执行相比,我们可以显著减少处理时间。无论是通过单个环境中的处理器时间切片实现,还是跨分布式系统(如容器和服务器)部署代码来实现,有效的并发性都可以最大限度地提高系统资源利用率和整体吞吐量。这种错综复杂的景观的核心是生产者-消费者模式,这种模式与现实世界的场景产生共鸣,并囊括了并发编程的本质。在踏上了进入这个动态领域的旅程之后,本文深入探讨了并发领域,特别关注我个人使用 C# 的强大工具 BlockingCollection 类对生产者-消费者模式的探索和

介绍

在快节奏的现代软件开发领域,并发是一个基石概念。并发支持同时执行多个任务,从而优化程序性能和响应能力。

由于能够跨线程或进程分配工作,与顺序执行相比,我们可以显著减少处理时间。无论是通过单个环境中的处理器时间切片实现,还是跨分布式系统(如容器和服务器)部署代码来实现,有效的并发性都可以最大限度地提高系统资源利用率和整体吞吐量。

这种错综复杂的景观的核心是生产者-消费者模式,这种模式与现实世界的场景产生共鸣,并囊括了并发编程的本质。

在踏上了进入这个动态领域的旅程之后,本文深入探讨了并发领域,特别关注我个人使用 C# 的强大工具 BlockingCollection 类对生产者-消费者模式的探索和实现。通过探索这种模式及其提供的功能,我们发现了将软件系统转变为弹性、高效和可扩展的实体的潜力,以满足现代计算不断变化的需求。

生产者-消费者格局

通常,在实时场景中,我们可以设计并发代码来遵循生产者-消费者模式,正如 Edsger W. Dijkstra 在 1965 年解释的那样:

“我们考虑两个过程,分别称为'生产者'和'消费者'。生产者是一个循环过程,每次经历循环时,它都会产生一定部分信息,这些信息必须由消费者处理。消费者也是一个循环过程,每当它经历它的循环时,它就可以处理下一部分信息,就像生产者所产生的那样......”

这个想法可以扩展到多个并发生产者和多个并发使用者。

但是,这种简单的模式可能会产生并发进程可能产生的最具挑战性的问题(如此进一步解释)。

BlockingCollection

幸运的是,C# 有一个内置的集合实现,可用于实现此模式。

C# 中的类是命名空间的一部分,它提供了一个线程安全集合,该集合专为一个或多个生成者生成数据或任务,一个或多个使用者使用或处理它们的情况而设计。BlockingCollection System.Collections.Concurrent

该类将并发集合的功能(如 或 )与同步和阻止功能相结合。BlockingCollectionConcurrentQueueConcurrentStack

演示

让我们来看看如何使用该类:BlockingCollection

public class Demo  
{  
    private readonly int _publishTimeout;  
    private readonly BlockingCollection<int> blockingCollection;  
  
    public Demo(int bound, int publishTimeout)  
    {  
        _publishTimeout = publishTimeout;  
        blockingCollection = new BlockingCollection<int>(new ConcurrentQueue<int>(), boundedCapacity: bound);  
    }  
  
    // Try produce one item into blockingCollection within specified timeout  
    public async Task Produce(int item, CancellationToken cancellationToken)  
    {  
        await Task.Delay(millisecondsDelay: 10, cancellationToken: cancellationToken); // Simulating some work  
        bool published = blockingCollection.TryAdd(++item, millisecondsTimeout: _publishTimeout, cancellationToken);  
  
        Console.WriteLine(published ? $"Produced item {item}" : $"SKipped item {item}, Collection Is Full");  
    }  
  
    // Consumes one item at a time from the blockingCollection  
    // while blockingCollection is empty, consumer thread goes to sleep inside blockingCollection.GetConsumingEnumerable()  
    public async Task Consume(int consumerId, CancellationToken cancellationToken)  
    {        
        foreach (int item in blockingCollection.GetConsumingEnumerable())  
        {  
            Console.WriteLine($"Consumer {consumerId} processed item {item}");  
            await Task.Delay(millisecondsDelay: 10, cancellationToken: cancellationToken); // Simulating some work  
        }  
    }  
}   

在上面的示例中,您可以看到一个基本示例,说明如何使用幕后实现生产者和消费者函数。BlockingCollectionConcurrentQueue

如您所见,具有边界功能,可以通过在构造函数中指定 boundedCapacity 变量来激活。BlockingCollectionBlockingCollection

边界意味着您可以设置集合的最大容量。这在某些情况下很重要,因为它使你能够控制内存中集合的最大大小,并防止生成线程在使用线程之前移动得太远。

多个线程或任务可以同时向集合添加项,如果集合达到其指定的最大容量,则生成线程将休眠,直到删除项。

如果您不希望在集合中有空间之前让生产者线程休眠,请使用“TryAdd”函数,指定生成消息的超时时间,并自行处理队列已满的情况。

此外,多个使用者可以同时删除项目,如果集合变为空,则使用线程将休眠,直到生产者添加项目。

需要注意的是,当生产者因集合已满而休眠或消费者因集合为空而休眠时,不会出现繁忙等待的情况。此方法可最大程度地减少这些线程中的 CPU 消耗。

让我们使用我们的演示类:

private static void Main(string[] args)  
{  
    const int concurrentProducers = 20;  
    const int concurrentConsumers = 20;  
    const int bound = 10;  
    const int publishTimeout = 100;  
  
    var demo = new Demo(bound, publishTimeout);  
    
    // start multiple concurrent producers  
    var producers = Enumerable.Range(0, concurrentProducers).Select(x => demo.Produce(x, CancellationToken.None));  
      
    // start multiple concurrent consumers  
    var consumers = Enumerable.Range(0, concurrentConsumers).Select(x => demo.Consume(x, CancellationToken.None));  
  
    var allTasks = producers.Union(consumers).ToArray();  
    await Task.WhenAll(allTasks);  
} 
// the program won't reach here since a consumer task doesn't end when it is empty, it sleeps until more messages arrive

如您所见,您可以控制生产者和消费者的并发级别,以及队列绑定,这可以为您提供内存消耗控制。

我对 BlockingCollection 的体验

我最近利用这个类创建了一个异步日志记录机制。该项目要求记录来自两个不同 API 调用的请求和响应数据。这些数据集处理Payoneer卡产品的3DS Secure流程。我们的具体要求是将来自这些 API 调用的某些数据保存在关系数据库中,以便对 3DS 验证流程进行分析研究。

考虑到 API 调用的性质(两者都是完成金融交易(3DS 验证流程的一部分)的关键),我试图最大限度地减少响应时间。最重要的是,客户的体验保持无缝,他们不会等待数据记录在数据库中。为了解决这个问题,我设计了一个解决方案,其中 API 调用日志被定向到 .此集合随后由负责将消息保存在数据库中的专用使用者进行管理。值得一提的是,我们对在进程崩溃或重新启动期间丢失一些日志的想法感到满意。BlockingCollection

为了增强此架构,我还采用了中介模式。每个消息类型都由关联的处理程序系统地使用,从而确保专门处理。这种机制允许我们通过同一个 BlockingCollection 汇集不同的消息,为它们最终存储在不同的数据库表中铺平了道路。为了实现调解器模式,我集成了 MediatR NuGet 包。

下面是将阻塞集合与消息处理程序的中介器结合使用的示例:

public class LoggerQueue : ILoggerQueue  
{  
    private static readonly ILogger _logger = LogManager.GetCurrentClassLogger();  
  
    private readonly IMediator _mediator;  
  
    public BlockingCollection<IRequest> Queue { get; }  
    public int Bound { get; }  
    public int AddTimeoutMS { get; }  
  
    public LoggerQueue(IMediator mediator, IConfiguration configuration)  
    {  
        AddTimeoutMS = configuration.GetSection("AppSettings").GetValue<int>("LoggerQueue_AddTimeoutMS");  
        Bound = configuration.GetSection("AppSettings").GetValue<int>("LoggerQueue_Bound");  
        Queue = new BlockingCollection<IRequest>(new ConcurrentQueue<IRequest>(), Bound);  
        _mediator = mediator;  
    }  
  
    public void Add(IRequest message)  
    {  
        if(!Queue.TryAdd(message, AddTimeoutMS))  
            _logger.Warn("Queue flooded, skipping message");  
    }  
  
    public async Task Consume(CancellationToken cancellationToken)  
    {  
        foreach (IRequest item in Queue.GetConsumingEnumerable(cancellationToken: cancellationToken))  
        {  
            try  
            {  
                // foward each message to it's associated message handler  
                await _mediator.Send(item, cancellationToken);   
            }  
            catch (Exception ex)  
            {  
                _logger.Error(ex, "Unhandled exception in logger consumer");  
            }  
        }  
    }  
}

在内存中 BlockingCollection 和持久队列系统之间进行选择

在简单的内存和更复杂的持久队列系统(如 Apache KafkaRabbitMQ)之间做出决定时,需要考虑某些因素。选择基本版可为快速的内存中消息路由提供快速有效的设置。由于磁盘上缺少数据持久性,如果在应用程序遇到退出或崩溃事件之前未使用消息,则存在消息丢失的风险。此外,与死信排队、消息过期/TTL(生存时间)、延迟消息传递等高级功能相比,简单的 BlockingCollection 存在不足。这些在更复杂的队列平台(如 Apache Kafka 或 RabbitMQ)中可用。BlockingCollectionBlockingCollectionBlockingCollection

掌握并发性是现代软件开发的关键,可以有效利用系统资源并提高整体系统性能。生产者-消费者模式(如 C# 中的 BlockingCollection 类所示)提供了一个强大的工具来无缝管理并发操作。通过在有界集合中同步生产者和使用者,开发人员可以在高吞吐量处理和受控内存使用之间实现微妙的平衡。

我个人在实现日志记录机制方面使用 BlockingCollection 的经验证明了它在实际场景中的实用性,尤其是在响应能力和资源管理至关重要的情况下。利用 BlockingCollection 等并发结构不仅可以简化程序执行,还可以使开发人员能够设计健壮且可扩展的系统,最终将软件开发过程提升到效率和可靠性的新高度。

阅读排行