在 .NET Core 3 中引入异步流,由接口表示,并在 C# 8 中直接支持通过定义为方法结果并使用来迭代使用或轻松实现新的异步流,就像我们所做的那样,Microsoft 标准化了 .NET 开发人员实现异步流的方式。IAsyncEnumerable<T>await foreachasync IAsyncEnumerable<T>yield return/yield breakIEnumerable<T>
即使我们没有意识到,我们可能每天都会使用异步流,从 Entity Framework Core 到 ASP.NET Core,它已成为 .NET 的重要组成部分,现在被广泛采用。
在本文中,我将展示我在直接工作时通常遇到的一些最常见的情况,以及我通常如何解决这些问题。IAsyncEnumerable<T>
在为异步流实现扩展时,需要牢记一些准则。
首先,不要在方法中验证函数输入(如 null 检查),而是创建一个包装器方法,用于验证并调用内部实现。这将使堆栈跟踪更加干净、易于分析和轻量级。async
其次,你的代码肯定需要一个,但不要在公共方法签名中定义它,即使它有一个默认值。为了使开发人员的工作更轻松,Microsoft 提供了可用于任何方法的扩展方法,而不是将 a 传递给每个方法,如果在方法签名中使用该属性,编译器将自动使用传递给该方法的取消令牌到所有方法。如果将其定义为公共签名中的参数,则会给使用它的开发人员造成混淆。CancellationTokenCancellationTokenWithCancellationIAsyncEnumerableEnumeratorCancellationWithCancellation
想象一下,您正在创建一个简单的扩展来过滤异步流,以遵循以下准则实现:Where
public static IAsyncEnumerable<T> Where<T>(
this IAsyncEnumerable<T> source,
Func<T, bool> predicate
)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(predicate);
return Core(source, predicate);
static async IAsyncEnumerable<T> Core(
IAsyncEnumerable<T> source,
Func<T, bool> predicate,
[EnumeratorCancellation] CancellationToken ct = default
)
{
await foreach (var item in source.WithCancellation(ct))
{
if (predicate(item))
yield return item;
}
}
}
正如你所看到的,验证是在实际实现之外完成的,就像只在内部定义一样。CancellationToken
我建议仔细查看 的源代码,这是官方支持的异步流 LINQ 扩展。System.Linq.Async
现在让我们深入了解一些我最常用的实用程序扩展。
从 接收数据时,我们可能不知道总共需要多少个项目以及总共需要多长时间,但我们可能希望在接收每个项目之间强制执行最大超时,以确保应用程序不会无限期等待。IAsyncEnumerable
现在一个非常常见的方案是与大型语言模型 (LLM) API 集成,这些 API 使用服务器发送事件 (SSE) 来流式传输响应令牌,以便实时显示给用户。即使某处加载,如果某些令牌响应花费的时间太长,用户可能会认为应用程序刚刚停止工作。
以下代码将 a 链接到原始代码,并使用其内部计时器在给定时间通过后自动取消,在等待下一项时抛出 a。CancellationTokenSourceCancellationTokenTaskCanceledException
public static IAsyncEnumerable<T> Timeout<T>(
this IAsyncEnumerable<T> source,
int millisecondsTimeout
)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentOutOfRangeException.ThrowIfLessThan(millisecondsTimeout, -1);
return Core(source, millisecondsTimeout);
static async IAsyncEnumerable<T> Core(
IAsyncEnumerable<T> source,
int millisecondsTimeout,
[EnumeratorCancellation] CancellationToken ct = default
)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(millisecondsTimeout);
await foreach (var element in source.WithCancellation(ct))
{
// disable the timer while the item is being processed
cts.CancelAfter(-1);
yield return element;
// re-enable the timer before fetching the next item
cts.CancelAfter(millisecondsTimeout);
}
}
}
我喜欢这种方法,因为它很简单,而且除了额外的分配和内部的(将按项目重用)之外,没有什么可说的,使其成为一个非常有效的实现。它可以像任何 LINQ 方法一样使用:CancellationTokenSourceTimer
await foreach (var i in GetRandomIntegersAsync(20)
.Timeout(500)
.WithCancellation(ct))
{
Console.WriteLine($"{DateTimeOffset.Now:O} -> {i}");
}
如果你像我一样更喜欢接收一个,你可以通过手动遍历枚举器而不是使用来更改实现,如果抛出一个但收到的没有取消,你可以假设这是由于超时并抛出一个。TimeoutExceptionawait foreachTaskCanceledExceptionCancellationTokenTimeoutException
public static IAsyncEnumerable<T> Timeout<T>(
this IAsyncEnumerable<T> source,
int millisecondsTimeout
)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentOutOfRangeException.ThrowIfLessThan(millisecondsTimeout, -1);
return Core(source, millisecondsTimeout);
static async IAsyncEnumerable<T> Core(
IAsyncEnumerable<T> source,
int millisecondsTimeout,
[EnumeratorCancellation] CancellationToken ct = default
)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(millisecondsTimeout);
await using var enumerator = source.GetAsyncEnumerator(cts.Token);
while (await MoveNextCheckTimeoutAsync(enumerator, ct))
{
cts.CancelAfter(-1);
yield return enumerator.Current;
cts.CancelAfter(millisecondsTimeout);
}
}
static async ValueTask<bool> MoveNextCheckTimeoutAsync(
IAsyncEnumerator<T> enumerator,
CancellationToken ct
)
{
try
{
return await enumerator.MoveNextAsync();
}
catch (TaskCanceledException e) when (!ct.IsCancellationRequested)
{
throw new TimeoutException("The next item took longer than expected to be received", e);
}
}
}
当从要保留的数据库接收数据时,为了减少和优化插入,最好对其进行批处理,而不是逐项存储。IAsyncEnumerable
想象一下,您正在将数据存储到某个关系表中,如果您收到 500 个项目,则有 500 次数据库访问,这可能会影响整个应用程序的性能。相反,如果您创建 10 个项目的批处理,则意味着 50 次数据库访问,中间间隔更多时间,这肯定会减少整体数据库负载。
如果我们使用 MoreLINQ 方法作为示例,我们可以很容易地将其转换为实现。BatchIAsyncEnumerable
public static IAsyncEnumerable<T[]> Batch<T>(
this IAsyncEnumerable<T> source,
int size
)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(size);
return Core(source, size);
static async IAsyncEnumerable<T[]> Core(
IAsyncEnumerable<T> source,
int size,
[EnumeratorCancellation] CancellationToken ct = default
)
{
T[] batch = null;
var count = 0;
await foreach (var item in source.WithCancellation(ct))
{
batch ??= new T[size];
batch[count++] = item;
if (count != size)
continue;
yield return batch;
batch = null;
}
if (count > 0)
{
Array.Resize(ref batch, count);
yield return batch;
}
}
}
批处理项时的另一种常见方案是支持超时参数,该参数将返回完整的批处理,或者在给定时间过后返回已接收的任何项。
这可以通过计算超时日期并检查每次收到项目时是否已过来轻松实现。如果已过给定的时间量,只需将集合大小调整为当前大小,返回该批处理并重新计算下一个超时日期即可。
public static IAsyncEnumerable<T[]> Batch<T>(
this IAsyncEnumerable<T> source,
int size,
int millisecondsTimeout
)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(size);
ArgumentOutOfRangeException.ThrowIfNegative(millisecondsTimeout);
return Core(source, size, millisecondsTimeout);
static async IAsyncEnumerable<T[]> Core(
IAsyncEnumerable<T> source,
int size,
int millisecondsTimeout,
[EnumeratorCancellation] CancellationToken ct = default
)
{
T[] batch = null;
var count = 0;
var timeoutOn = DateTime.UtcNow.AddMilliseconds(millisecondsTimeout);
await foreach (var item in source.WithCancellation(ct))
{
batch ??= new T[size];
batch[count++] = item;
if (count != size)
{
if (timeoutOn > DateTime.UtcNow)
continue;
Array.Resize(ref batch, count);
}
yield return batch;
batch = null;
timeoutOn = DateTime.UtcNow.AddMilliseconds(millisecondsTimeout);
count = 0;
}
if (count > 0)
{
Array.Resize(ref batch, count);
yield return batch;
}
}
}
我喜欢这种方法,因为它非常易于理解且高效,因为它只在初始批处理方法上分配了一个额外的方法。唯一的缺点是它仅在收到物品_后_超时,而不是在等待时超时。为了解决这个限制,实现将需要一个计时器和常量分配,至少对于我的用例来说,复杂性和性能过载是不值得的。DateTimeTaskCompletitionSource
同样,此方法可以像其他 LINQ 扩展一样轻松使用:
await foreach (var i in GetRandomIntegersAsync(20)
.Batch(5, 1000)
.Where(e => e.Length > 0)
.WithCancellation(ct))
{
Console.WriteLine($"{DateTimeOffset.Now:O} -> {i.Length}");
}
我通常面临的最后一种情况是应用一些限制,以防止在从 接收的项目吞吐量高的情况下使应用程序过载。它通常与超时方法很好地结合在一起,以强制执行数据的预测节奏。IAsyncEnumerableBatch
实现非常简单:计算超时日期,获取下一个项目,如果没有足够的时间过去,只需对剩余时间进行延迟,然后对每个项目重复该过程。
public static IAsyncEnumerable<T> Throttling<T>(
this IAsyncEnumerable<T> source,
int millisecondsDelay
)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentOutOfRangeException.ThrowIfNegative(millisecondsDelay);
return Core(source, millisecondsDelay);
static async IAsyncEnumerable<T> Core(
IAsyncEnumerable<T> source,
int millisecondsDelay,
[EnumeratorCancellation] CancellationToken ct = default
)
{
var timeoutOn = DateTime.UtcNow.AddMilliseconds(millisecondsDelay);
await foreach (var item in source.WithCancellation(ct))
{
if (DateTime.UtcNow < timeoutOn)
await Task.Delay(timeoutOn - DateTime.UtcNow, ct);
yield return item;
timeoutOn = DateTime.UtcNow.AddMilliseconds(millisecondsDelay);
}
}
}
再一次,像其他 LINQ 扩展一样使用它:
await foreach (var i in GetRandomIntegersAsync(20)
.Throttling(5000)
.WithCancellation(ct))
{
Console.WriteLine($"{DateTimeOffset.Now:O} -> {i}");
}
在本文中,我展示了我在使用时使用的一些最常见的实用程序方法,允许在等待下一项时实现超时,以最大等待时间进行批处理或强制限制以防止应用程序过载。IAsyncEnumerable
随意以对您有意义的方式更改它们,例如创建接收 的重载,或者将它们用作创建自己的实用程序方法的示例。TimeSpan
下面是完整的代码示例,因此您可以将其放入自己的项目中。
namespace System.Collections.Generic;
public static class AsyncEnumerableExtensions
{
public static IAsyncEnumerable<T> Timeout<T>(
this IAsyncEnumerable<T> source,
int millisecondsTimeout
)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentOutOfRangeException.ThrowIfLessThan(millisecondsTimeout, -1);
return Core(source, millisecondsTimeout);
static async IAsyncEnumerable<T> Core(
IAsyncEnumerable<T> source,
int millisecondsTimeout,
[EnumeratorCancellation] CancellationToken ct = default
)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(millisecondsTimeout);
await using var enumerator = source.GetAsyncEnumerator(cts.Token);
while (await MoveNextCheckTimeoutAsync(enumerator, ct))
{
cts.CancelAfter(-1);
yield return enumerator.Current;
cts.CancelAfter(millisecondsTimeout);
}
}
static async ValueTask<bool> MoveNextCheckTimeoutAsync(
IAsyncEnumerator<T> enumerator,
CancellationToken ct
)
{
try
{
return await enumerator.MoveNextAsync();
}
catch (TaskCanceledException e) when (!ct.IsCancellationRequested)
{
throw new TimeoutException("The next item took longer than expected to be received", e);
}
}
}
public static IAsyncEnumerable<T[]> Batch<T>(
this IAsyncEnumerable<T> source,
int size
)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(size);
return Core(source, size);
static async IAsyncEnumerable<T[]> Core(
IAsyncEnumerable<T> source,
int size,
[EnumeratorCancellation] CancellationToken ct = default
)
{
T[] batch = null;
var count = 0;
await foreach (var item in source.WithCancellation(ct))
{
batch ??= new T[size];
batch[count++] = item;
if (count != size)
continue;
yield return batch;
batch = null;
}
if (count > 0)
{
Array.Resize(ref batch, count);
yield return batch;
}
}
}
public static IAsyncEnumerable<T[]> Batch<T>(
this IAsyncEnumerable<T> source,
int size,
int millisecondsTimeout
)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(size);
ArgumentOutOfRangeException.ThrowIfNegative(millisecondsTimeout);
return Core(source, size, millisecondsTimeout);
static async IAsyncEnumerable<T[]> Core(
IAsyncEnumerable<T> source,
int size,
int millisecondsTimeout,
[EnumeratorCancellation] CancellationToken ct = default
)
{
T[] batch = null;
var count = 0;
var timeoutOn = DateTime.UtcNow.AddMilliseconds(millisecondsTimeout);
await foreach (var item in source.WithCancellation(ct))
{
batch ??= new T[size];
batch[count++] = item;
if (count != size)
{
if (timeoutOn > DateTime.UtcNow)
continue;
Array.Resize(ref batch, count);
}
yield return batch;
batch = null;
timeoutOn = DateTime.UtcNow.AddMilliseconds(millisecondsTimeout);
count = 0;
}
if (count > 0)
{
Array.Resize(ref batch, count);
yield return batch;
}
}
}
public static IAsyncEnumerable<T> Throttling<T>(
this IAsyncEnumerable<T> source,
int millisecondsDelay
)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentOutOfRangeException.ThrowIfNegative(millisecondsDelay);
return Core(source, millisecondsDelay);
static async IAsyncEnumerable<T> Core(
IAsyncEnumerable<T> source,
int millisecondsDelay,
[EnumeratorCancellation] CancellationToken ct = default
)
{
var timeoutOn = DateTime.UtcNow.AddMilliseconds(millisecondsDelay);
await foreach (var item in source.WithCancellation(ct))
{
if (DateTime.UtcNow < timeoutOn)
await Task.Delay(timeoutOn - DateTime.UtcNow, ct);
yield return item;
timeoutOn = DateTime.UtcNow.AddMilliseconds(millisecondsDelay);
}
}
}
}