C#中的异步、多线程和并行编程

作者:微信公众号:【架构师老卢】
6-8 8:18
30

概述:本文将介绍 .NET 中异步编程的基础知识,包括 和 关键字的使用、任务的作用以及它们如何与线程池交互。我们将探讨演示异步方法、处理异步代码中的异常以及通过并行编程提高性能的实际示例。此外,我们还将讨论编写高效异步代码的最佳实践以及要避免的常见陷阱。在本文结束时,你将对如何在 .NET 应用程序中实现异步编程有深入的了解。异步编程.NET 中的异步编程允许程序在不阻塞主线程的情况下执行任务,从而使程序保持响应。这对于可能需要一些时间的任务特别有用,例如I / O 操作、网络请求或任何其他长时间运行的进程。.NET 中异步编程的关键概念:Async 和 Await: 这些是用于定义异步方法的关键

本文将介绍 .NET 中异步编程的基础知识,包括 和 关键字的使用、任务的作用以及它们如何与线程池交互。我们将探讨演示异步方法、处理异步代码中的异常以及通过并行编程提高性能的实际示例。此外,我们还将讨论编写高效异步代码的最佳实践以及要避免的常见陷阱。在本文结束时,你将对如何在 .NET 应用程序中实现异步编程有深入的了解。

异步编程

.NET 中的异步编程允许程序在不阻塞主线程的情况下执行任务,从而使程序保持响应。这对于可能需要一些时间的任务特别有用,例如I / O 操作、网络请求或任何其他长时间运行的进程。

.NET 中异步编程的关键概念:

  1. Async 和 Await: 这些是用于定义异步方法的关键字。
  • async:用于将方法声明为异步方法。
  • await:用于暂停异步方法的执行,直到等待的任务完成。

2. 任务: 表示异步操作。该类用于处理和控制这些操作。

与线程池的关系

线程池

由 .NET 管理的用于执行后台任务的线程集合。.NET 不会在每次执行异步任务时创建新线程,而是使用此池中的线程来优化性能。

使用 和 时,该方法不会阻塞主线程。相反,它在线程池中的线程上运行。等待的任务完成后,它将返回到调用它的上下文,这通常是主线程。

执行

public async Task<int> FetchDataCountAsync()  
{  
    // 1. Call a synchronous method to get all products.   
    // This runs on the current thread.  
    var products = productService.GetAll();  
  
    // 2. Calculate the length of the products array.   
    // This also runs on the current thread.  
    var productLength = products.length();  
  
    // 3. Call an asynchronous method to get all categories.   
    // This does not block the current thread.  
    // The control returns to the caller until this task is completed.  
    var categories = await catService.GetAll();  
    // The method pauses here until catService.GetAll() completes.  
    // Once completed, the result is assigned to the 'categories' variable.  
  
    // 4. Calculate the length of the categories array.   
    // This runs on the current thread after the await completes.  
    var catLength = categories.Length();  
  
    // 5. Call an asynchronous method to get all limits.   
    // This does not block the current thread.  
    // The control returns to the caller until this task is completed.  
    var limits = await limitService.getAll();  
    // The method pauses here until limitService.getAll() completes.  
    // Once completed, the result is assigned to the 'limits' variable.  
  
    // 6. Calculate the length of the limits array.   
    // This runs on the current thread after the await completes.  
    var limitLength = limits.length();  
  
    // 7. This runs on the current thread.  
    return productLength + catLength + limitLength;  
}

解释

异步与同步

在 .NET 中,同步方法通常不会直接与线程池交互,除非它们显式使用线程池,例如 via 或 .默认情况下,同步方法在调用它们的当前线程上运行。但是,您可以使用线程池在后台线程中运行同步方法,从而释放主线程。

执行

public int FetchDataSync()  
{  
    // 1. Call to get all products.   
    // This runs on the current thread and blocks until it completes.  
    var products = productService.GetAll();  
  
    // 2. Calculate the length of the products array.   
    // This runs on the current thread after the previous line completes.  
    var productLength = products.length();  
  
    // 3. Call to get all categories.   
    // This runs on the current thread and blocks until it completes.  
    var categories = catService.GetAll();  
  
    // 4. Calculate the length of the categories array.   
    // This runs on the current thread after the previous line completes.  
    var catLength = categories.Length();  
  
    // 5. Call to get all limits.   
    // This runs on the current thread and blocks until it completes.  
    var limits = limitService.getAll();  
  
    // 6. Calculate the length of the limits array.   
    // This runs on the current thread after the previous line completes.  
    var limitLength = limits.length();  
  
    // 7. This runs on the current thread after the previous lines complete.  
    return productLength + catLength + limitLength;  
}

解释

多线程编程

C# 中的多线程编程涉及**在单个应用程序中创建和管理多个线程。**这允许应用程序同时执行多个任务,从而提高性能和响应能力,尤其是在多核处理器上。

关键概念

  1. 线程: 可以计划执行的进程的最小单位。在 C# 中,可以使用该类创建和管理线程。
  2. 线程池: 由 .NET Framework 管理的工作线程池。它处理线程创建和管理,这有助于提高性能和资源管理。
  3. 任务: 线程的更高层次的抽象。任务是任务并行库 (TPL) 的一部分,它提供了一种更高效、更简单的异步操作处理方法。

方案 1:线程结果之间没有依赖关系

using System;  
using System.Threading;  
  
class Program  
{  
    static void Main()  
    {  
        // Create three threads  
        Thread thread1 = new Thread(new ThreadStart(Activity1));  
        Thread thread2 = new Thread(new ThreadStart(Activity2));  
        Thread thread3 = new Thread(new ThreadStart(Activity3));  
  
        // Start the threads  
        thread1.Start();  
        thread2.Start();  
        thread3.Start();  
  
        // Wait for threads to complete  
        thread1.Join();  
        thread2.Join();  
        thread3.Join();  
  
        Console.WriteLine("All activities completed.");  
    }  
  
    static void Activity1()  
    {  
    }  
  
    static void Activity2()  
    {  
    }  
  
    static void Activity3()  
    {  
    }  
}

方案 2:线程结果之间的依赖关系

线程 1 的输出是启动线程 2 所必需的。使用简单的同步机制在线程之间发出信号。

using System;  
using System.Threading;  
  
class Program  
{  
    static ManualResetEvent activity1Completed = new ManualResetEvent(false);  
    static string sharedData;  
  
    static void Main()  
    {  
        // Create three threads  
        Thread thread1 = new Thread(new ThreadStart(Activity1));  
        Thread thread2 = new Thread(new ThreadStart(Activity2));  
        Thread thread3 = new Thread(new ThreadStart(Activity3));  
  
        // Start the threads  
        thread1.Start();  
        thread2.Start();  
        thread3.Start();  
  
        // Wait for threads to complete  
        thread1.Join();  
        thread2.Join();  
        thread3.Join();  
  
        Console.WriteLine("All activities completed.");  
    }  
  
    static void Activity1()  
    {  
        // Set shared data and signal completion  
        sharedData = "Data from Activity 1";  
        activity1Completed.Set();  
    }  
  
    static void Activity2()  
    {  
        // Wait for Activity 1 to complete  
        activity1Completed.WaitOne();  
  
        // Implementation  
        var Act1Results = sharedData;       
    }  
  
    static void Activity3()  
    {  
    }  
}

方案 3:处理异常

using System;  
using System.Threading;  
using System.Threading.Tasks;  
  
class Program  
{  
    static CancellationTokenSource cts = new CancellationTokenSource();  
  
    static void Main()  
    {  
        // Create and start tasks  
        Task task1 = Task.Run(() => Activity1(cts.Token), cts.Token);  
        Task task2 = Task.Run(() => Activity2(cts.Token), cts.Token);  
        Task task3 = Task.Run(() => Activity3(cts.Token), cts.Token);  
  
        try  
        {  
            // Wait for all tasks to complete  
            Task.WaitAll(task1, task2, task3);  
        }  
        catch (AggregateException ex)  
        {  
            // Handle the exception  
            foreach (var innerEx in ex.InnerExceptions)  
            {  
                Console.WriteLine($"Exception: {innerEx.Message}");  
            }  
  
            // Revert changes here  
            RevertChanges();  
  
            // Signal that the tasks were cancelled  
            Console.WriteLine("All activities stopped and changes reverted.");  
        }  
    }  
  
    static void Activity1(CancellationToken token)  
    {  
        try  
        {  
            throw new Exception("Error in Activity 1");  
        }  
        catch (Exception ex)  
        {  
            cts.Cancel(); // Cancel all tasks  
            throw; // Re-throw the exception to be caught by Task.WaitAll  
        }  
    }  
  
    static void Activity2(CancellationToken token)  
    {  
        try  
        {  
        }  
        catch (OperationCanceledException)  
        {  
            Console.WriteLine("Activity 2 cancelled.");  
        }  
    }  
  
    static void Activity3(CancellationToken token)  
    {  
        try  
        {  
        }  
        catch (OperationCanceledException)  
        {  
            Console.WriteLine("Activity 3 cancelled.");  
        }  
    }  
  
    static void RevertChanges()  
    {  
        // Implement the logic to revert changes here  
        Console.WriteLine("Reverting changes...");  
    }  
}

任务Paralel库(TPL)

TPL 是 System.Threading.Tasks 命名空间中的一组公共类型和 API,可用于轻松编写并行和异步代码。以下是 TPL 的一些主要功能和优势:

1. 创建和启动任务

using System;  
using System.Threading.Tasks;  
  
class Program  
{  
    static void Main(string[] args)  
    {  
        Task task = Task.Run(() =>   
        {  
            // Your code here  
            Console.WriteLine("Task is running.");  
        });  
  
        task.Wait(); // Waits for the task to complete  
    }  
}

2. 从任务中返回结果

using System;  
using System.Threading.Tasks;  
  
class Program  
{  
    static void Main(string[] args)  
    {  
        Task<int> task = Task.Run(() =>   
        {  
            // Your code here  
            return 42;  
        });  
  
        int result = task.Result; // Blocks and gets the result  
        Console.WriteLine($"Result: {result}");  
    }  
}

3. 使用 和 进行异步编程asyncawait

using System;  
using System.Threading.Tasks;  
  
class Program  
{  
    static async Task Main(string[] args)  
    {  
        int result = await GetNumberAsync();  
        Console.WriteLine($"Result: {result}");  
    }  
  
    static Task<int> GetNumberAsync()  
    {  
        return Task.Run(() =>   
        {  
            // Simulate work  
            Task.Delay(2000).Wait();  
            return 42;  
        });  
    }  
}

4. 与类并行编程Parallel

using System;  
using System.Threading.Tasks;  
  
class Program  
{  
    static void Main(string[] args)  
    {  
        Parallel.For(0, 10, i =>   
        {  
            Console.WriteLine($"Processing {i}");  
        });  
  
        string[] words = { "one", "two", "three" };  
        Parallel.ForEach(words, word =>   
        {  
            Console.WriteLine($"Processing {word}");  
        });  
    }  
}

5. 延续任务

可以使用延续任务将任务链接在一起。

using System;  
using System.Threading.Tasks;  
  
class Program  
{  
    static void Main(string[] args)  
    {  
        Task task = Task.Run(() =>   
        {  
            Console.WriteLine("Initial task.");  
        });  
  
        task.ContinueWith(t =>   
        {  
            Console.WriteLine("Continuation task.");  
        }).Wait();  
    }  
}

6. 任务中的异常处理

using System;  
using System.Threading.Tasks;  
  
class Program  
{  
    static void Main(string[] args)  
    {  
        Task task = Task.Run(() =>   
        {  
            throw new InvalidOperationException("Something went wrong.");  
        });  
  
        try  
        {  
            task.Wait();  
        }  
        catch (AggregateException ex)  
        {  
            foreach (var innerEx in ex.InnerExceptions)  
            {  
                Console.WriteLine(innerEx.Message);  
            }  
        }  
    }  
}

7. Task.WaitAll 和 Task.WhenAll

等待多个任务完成,使用 和Task.WaitAllTask.WhenAll

using System;  
using System.Threading.Tasks;  
  
class Program  
{  
    static async Task Main(string[] args)  
    {  
        Task task1 = Task.Run(() => Task.Delay(1000));  
        Task task2 = Task.Run(() => Task.Delay(2000));  
  
        // Blocking wait  
        Task.WaitAll(task1, task2);  
  
        // Async wait  
        await Task.WhenAll(task1, task2);  
    }  
}
  • Task.WaitAll:阻止直到所有任务完成。
  • Task.WhenAll:返回在所有任务完成时完成的任务。

8. Task.WaitAny 和 Task.WhenAny

等待多个任务中的任何一个完成

using System;  
using System.Threading.Tasks;  
  
class Program  
{  
    static async Task Main(string[] args)  
    {  
        Task task1 = Task.Run(() => Task.Delay(1000));  
        Task task2 = Task.Run(() => Task.Delay(2000));  
  
        // Blocking wait  
        int index = Task.WaitAny(task1, task2);  
        Console.WriteLine($"Task {index + 1} completed first.");  
  
        // Async wait  
        Task firstTask = await Task.WhenAny(task1, task2);  
        Console.WriteLine("First task completed.");  
    }  
}
  • Task.WaitAny:阻止,直到任何一个任务完成。
  • Task.WhenAny:返回一个任务,该任务在任何一个任务完成时完成。

9. 取消任务

using System;  
using System.Threading;  
using System.Threading.Tasks;  
  
class Program  
{  
    static async Task Main(string[] args)  
    {  
        CancellationTokenSource cts = new CancellationTokenSource();  
  
        Task task = Task.Run(() =>   
        {  
            for (int i = 0; i < 10; i++)  
            {  
                if (cts.Token.IsCancellationRequested)  
                {  
                    Console.WriteLine("Task cancelled.");  
                    return;  
                }  
  
                Task.Delay(1000).Wait();  
                Console.WriteLine($"Task running {i}");  
            }  
        }, cts.Token);  
  
        await Task.Delay(3000);  
        cts.Cancel();  
        await task;  
    }  
}
阅读排行