.NET 9:Task.WhenEach 和优先通道

作者:微信公众号:【架构师老卢】
10-4 8:38
188

.NET 9 引入了几个新的线程功能,这些功能简化了开发人员使用异步任务和通道的方式。在本文中,我们将探讨新的 API 和添加的优先无界通道,并提供代码示例来演示它们的用法。Task.WhenEach

Task.WhenEach:简化任务迭代

最有用的新增功能之一是 API。以前,在任务完成时迭代任务需要复杂的模式,例如调用以检索下一个已完成的任务。Task.WhenEachTask.WaitAny

借助 ,您可以使用循环在任务完成时迭代任务,从而显著简化任务管理。Task.WhenEachawait foreach

我们来看看这个 API 在进行异步 I/O 调用的实际场景中是如何工作的。

using System;
using System.Net.Http;
using System.Threading.Tasks;

class Program
{
    static async Task Main(string[] args)
    {
        using HttpClient httpClient = new HttpClient();
        
        // Example URLs to fetch data from
        Task<string> firstTask = httpClient.GetStringAsync("https://api.github.com");
        Task<string> secondTask = httpClient.GetStringAsync("https://jsonplaceholder.typicode.com/posts/1");
        Task<string> thirdTask = httpClient.GetStringAsync("https://jsonplaceholder.typicode.com/posts/2");

        // Use Task.WhenEach to process tasks as they complete
        await foreach (Task<string> completedTask in Task.WhenEach(firstTask, secondTask, thirdTask))
        {
            try
            {
                Console.WriteLine($"Response Length: {completedTask.Result.Length}");
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Task encountered an error: {ex.Message}");
            }
        }
    }
}
  • 三个异步任务向不同的 URL 发出 HTTP 请求。
  • 该循环允许我们在每个任务完成时对其进行处理,而无需等待所有任务完成。await foreach
  • 每个任务的结果都会立即得到处理,从而允许更灵敏、更高效的任务处理,尤其是在 I/O 密集型操作中。

这个新的 API 非常适合需要在任务结果可用时立即对任务结果执行操作,而不必等待所有任务完成的情况。

优先无界通道

在 .NET 中,通道是用于程序不同部分之间通信的数据结构。它通常用于具有多个生产者(添加数据)和使用者(处理数据)的场景。通道的工作方式与队列类似,但它们对数据的处理方式提供了更多控制,尤其是在并发或多线程环境中。

通道允许在不同任务之间进行安全、线程友好的通信。两种主要类型的通道是:

  1. 有界通道:它们对可以存储的项目数量有限制。
  2. 无界频道:它们可以存储无限数量的项目。

默认情况下,渠道按添加顺序 (FIFO) 处理项目。但是,随着 .NET 9 中引入优先通道,您现在可以根据优先级处理项目,而不仅仅是根据项目的添加顺序。

以下是优先渠道的运作方式细分:

using System;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Collections.Generic;

class Program
{
    static async Task Main(string[] args)
    {
        // Create a channel that prioritizes smaller numbers (lower numbers processed first)
        Channel<int> priorityChannel = Channel.CreateUnboundedPrioritized<int>(Comparer<int>.Create((x, y) => x.CompareTo(y)));

        // Add numbers in random order
        await priorityChannel.Writer.WriteAsync(10);
        await priorityChannel.Writer.WriteAsync(3);
        await priorityChannel.Writer.WriteAsync(7);

        // Complete the writer so no more data is written
        priorityChannel.Writer.Complete();

        // Read items from the channel in prioritized order (smallest to largest)
        while (await priorityChannel.Reader.WaitToReadAsync())
        {
            while (priorityChannel.Reader.TryRead(out int item))
            {
                Console.WriteLine($"Processed item: {item}");
            }
        }
    }
}
  • Channel Creation:使用创建渠道,以设置优先渠道。自定义比较器可确保将较小的数字视为具有更高的优先级。Channel.CreateUnboundedPrioritized<int>()(x, y) => x.CompareTo(y)
  • 写入通道:将三个数字(10、3 和 7)添加到通道中。它们以随机顺序添加。
  • Completing the Writer:调用 表示不会再向通道写入更多项目。priorityChannel.Writer.Complete()
  • Reading from the Channel:代码等待读取器使用 .WaitToReadAsync()

从通道读取项目时,首先处理较小的数字 (3),然后是 7,最后是 10。

Why Use Prioritized Channels?

  • 任务优先级: 在某些应用程序中,您可能希望先于其他任务处理优先级较高的任务(例如,实时系统或应首先完成关键操作的任务调度)。
  • 控制执行:与传统的 FIFO 通道不同,优先通道允许您根据重要性而不是时间定义应如何处理任务或数据。
相关留言评论
昵称:
邮箱:
阅读排行