深入理解C#中的Task类:创建、执行和取消异步操作

作者:微信公众号:【架构师老卢】
1-26 15:27
147

Task 类表示通常异步执行的单个操作。Task 对象是 .NET Framework 4 中首次引入的基于任务的异步模式的核心组件之一。

任务比线程更轻量级。默认任务计划程序在线程池内运行任务。因此,Task 更适合运行小型、快速的异步操作。如果使用任务运行许多长时间运行的任务,则可能会用完线程池中的所有线程,并且新任务必须等待以前的任务完成执行。

创建和运行任务

创建和执行任务的最常见方法是使用 Task.Run() 或 TaskFactory.StartNew() 方法。

Task.Run 和 TaskFactory.StartNew 之间的主要区别在于,可以使用 TaskFactory.StartNew 将参数传递给任务。

将参数传递给任务

public System.Threading.Tasks.Task StartNew (Action<object?> action, object? state);

TaskFactory.StartNew() 的一个重载方法采用两个参数:第一个是异步执行的操作委托,第二个 () 是传递到操作委托的参数。第二个参数是类型。您可以将任何单个对象作为参数传入,并在任务中将其转换为其原始类型。stateobject

取消任务

Task 类支持通过使用 CancellationTokenSource 对象进行取消。此对象具有 Token 属性和 Cancel() 方法。以下是任务取消的工作原理:

在主线程中,创建一个 CancellationTokenSource 对象。 将 CancellationTokenSource.Token 属性传递给创建的所有任务。 定期检查任务中的 CancellationToken.IsCancellationRequested 属性,并相应地处理取消。 在主线程中,调用 CancellationTokenSource.Cancel() 方法。Cancel() 方法会将 CancellationToken.IsCancellationRequested 属性设置为 true。

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    internal class Program
    {
        static void Main(string[] args)
        {
            var tokenSource = new CancellationTokenSource();
            var token = tokenSource.Token;

            Task.Run(() =>
            {
                int count = 1;
                while (!token.IsCancellationRequested)
                {
                    Thread.Sleep(1000);
                    Console.WriteLine($"Sleep {count++} seconds");
                }
                Console.WriteLine($"Cancellation requested.");
            }, token);

            var key = Console.ReadKey();

            while (key.KeyChar != 'c')
            {
                key = Console.ReadKey();
            }

            tokenSource.Cancel();
            Console.WriteLine($"Send Cancel command and exit.");

            Console.ReadKey();
        }
    }
}

取消阻止任务

如您所见,上述取消机制依赖于定期检查 CancellationToken.IsCancellationRequested 属性。如果我的任务是阻塞方法调用怎么办?遗憾的是,没有适用于所有方案的通用解决方案。

如果方法支持 Begin/End 异步 API

如果要调用的阻止方法还提供异步替代方法,则可以利用从 Begin... () 方法创建轮询情境。

if (result.AsyncWaitHandle.WaitOne(1000)) 
{
  // At this point, asynchronous operation completed
  Console.WriteLine("Request processed asyncronously.");
};

WaitOne() 方法中的参数 (1000) 是以毫秒为单位的超时值。返回值 false 表示发生超时;返回值 true 表示异步操作完成。

using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp2
{
    internal class Program
    {
        private static readonly HttpListener listener = new HttpListener { Prefixes = { $"https://localhost:8098/" } };
        static void Main(string[] args)
        {
            CancellationTokenSource tokenSource = new CancellationTokenSource();
            CancellationToken token = tokenSource.Token;
            Task.Run(() =>
            {
                listener.Start();
                int count = 1;
                while (!token.IsCancellationRequested)
                {
                    var result = listener.BeginGetContext(new AsyncCallback(ListenerCallback), listener);
                    Console.WriteLine($"{count++}: Waiting for request to be processed asyncronously.");
                    if (result.AsyncWaitHandle.WaitOne(1000))
                    {
                        Console.WriteLine("Request processed asyncronously.");
                    };
                }
                listener.Close();
            }, token);
            var key = Console.ReadKey();

            while (key.KeyChar != 'c')
            {
                key = Console.ReadKey();
            }

            tokenSource.Cancel();
            Console.WriteLine($"Send Cancel command and exit.");

            Console.ReadKey();
        }

        private static void ListenerCallback(IAsyncResult result)
        {
            HttpListener listener = (HttpListener)result.AsyncState;
            if (listener.IsListening)
            {
                HttpListenerContext context = listener.EndGetContext(result);
                HttpListenerResponse response = context.Response;
                string responseString = "<HTML><BODY> Hello world!</BODY></HTML>";
                byte[] buffer = System.Text.Encoding.UTF8.GetBytes(responseString);
                response.ContentLength64 = buffer.Length;
                System.IO.Stream output = response.OutputStream;
                output.Write(buffer, 0, buffer.Length);
                output.Close();
            }
        }
    }
}

如果该方法可以被其他操作中断

这更具体地适用于您调用的方法。有些方法可能支持这种情况,有些方法不支持。让我们以 HttpListener 为例。HttpListener 具有 GetContextAsync() 方法。此方法将阻止任务。但是,如果调用 HttpListener.Stop() 方法,它将引发 HttpListenerException,从而取消阻止 GetContextAsync() 方法。

当应用程序被 GetContextAsync() 方法阻止时,您将如何调用 HttpListener.Stop() 方法?您可以使用 CancellationToken.Register() 方法注册一个委托,该委托将在取消此 CancellationToken 时调用。

using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp3
{
    internal class Program
    {
        private static readonly HttpListener listener = new HttpListener { Prefixes = { $"https://localhost:8098/" } };
        static void Main(string[] args)
        {
            CancellationTokenSource tokenSource = new CancellationTokenSource();
            CancellationToken token = tokenSource.Token;
            token.Register(() =>
            {
                if (listener.IsListening)
                {
                    listener.Stop();
                }
            });
            Task.Run(async () =>
            {
                listener.Start();
                while (!token.IsCancellationRequested)
                {
                    try
                    {
                        Console.WriteLine("Waiting for request to come...");
                        var context = await listener.GetContextAsync();
                        SendResponse(context);
                        Console.WriteLine("Request processed asyncronously.");
                    }
                    catch (Exception e)
                    {
                        if (e is HttpListenerException)
                        {
                            //this gets thrown when the listener is stopped
                            Console.WriteLine($"Task received cancel request and exit.");
                            return;
                        }
                        Console.WriteLine(e.Message);
                    }
                }
                listener.Close();
            }, token);
            var key = Console.ReadKey();

            while (key.KeyChar != 'c')
            {
                key = Console.ReadKey();
            }

            tokenSource.Cancel();
            Console.WriteLine($"Main thread sent Cancel command and exit.");

            Console.ReadKey();
        }

        private static void SendResponse(HttpListenerContext context)
        {
            Console.WriteLine($"Send hello word response.");

            HttpListenerResponse response = context.Response;
            string responseString = "<HTML><BODY> Hello world!</BODY></HTML>";
            byte[] buffer = System.Text.Encoding.UTF8.GetBytes(responseString);
            response.ContentLength64 = buffer.Length;
            System.IO.Stream output = response.OutputStream;
            output.Write(buffer, 0, buffer.Length);
            output.Close();
        }
    }
}
相关留言评论
昵称:
邮箱:
阅读排行