.Net Core 中的线程同步

作者:微信公众号:【架构师老卢】
3-14 18:25
90

.NET Core 中的线程同步是开发高性能并发应用程序的关键方面。它确保多个线程可以安全地访问资源,而不会导致数据损坏或不一致。本文深入探讨了 .NET Core 中线程同步的核心概念,为各种方案提供了深入的说明和 C# 编码示例。🚀

1. 了解线程同步 🤔

在多线程应用程序中,线程通常需要与共享资源(如数据结构、文件或数据库)进行交互。如果没有适当的同步,多个线程的并发访问可能会导致争用条件、死锁和数据损坏等问题。

2. 同步原语 🛠️

.NET Core 提供了多个同步基元来处理不同的同步需求:.NET Core provides several synchronization primitives to handle different synchronization needs:

  • 互斥锁:确保一次只有一个线程可以访问资源。
  • 信号量:限制可以访问特定资源或资源池的线程数。
  • 监视器:通过获取对象上的锁来提供互斥机制。
  • ReaderWriterLockSlim:允许多个线程读取或以独占方式写入资源。

3. 互斥锁:独家访问 🚪

问题陈述:假设您有一个关键代码段用于更新共享资源(例如文件或数据库条目),并且您需要确保一次只有一个线程可以执行此部分以防止数据损坏。

使用 C# 示例的解决方案:

using System;  
using System.Threading;  
  
class Program  
{  
    private static Mutex mutex = new Mutex();  
    static void Main(string[] args)  
    {  
        for (int i = 0; i < 5; i++)  
        {  
            Thread t = new Thread(WriteToSharedResource);  
            t.Name = $"Thread{i + 1}";  
            t.Start();  
        }  
    }  
  
    static void WriteToSharedResource()  
    {  
        Console.WriteLine($"{Thread.CurrentThread.Name} is requesting the mutex");  
        mutex.WaitOne(); // Requesting the mutex  
        Console.WriteLine($"{Thread.CurrentThread.Name} has entered the critical section");  
  
        // Simulate work (e.g., writing to a shared resource)  
        Thread.Sleep(1000);  
  
        Console.WriteLine($"{Thread.CurrentThread.Name} is leaving the critical section");  
        mutex.ReleaseMutex(); // Releasing the mutex  
    }  
}

此示例演示如何使用 a 来确保一次只有一个线程可以访问关键部分,从而防止并发访问问题。Mutex

4. 信号量:限制并发访问 🚦

问题陈述:假设您有一个处理传入请求的应用程序,但由于资源限制(例如,数据库连接),您只能同时处理一定数量的请求。

使用 C# 示例的解决方案:

using System;  
using System.Threading;  
  
class Program  
{  
    static SemaphoreSlim semaphore = new SemaphoreSlim(3); // Allows up to 3 threads to enter  
  
    static void Main(string[] args)  
    {  
        for (int i = 0; i < 10; i++)  
        {  
            Thread t = new Thread(EnterSemaphore);  
            t.Start(i);  
        }  
    }  
  
    static void EnterSemaphore(object id)  
    {  
        Console.WriteLine($"Request {id} is waiting to enter");  
        semaphore.Wait(); // Request to enter the semaphore  
        Console.WriteLine($"Request {id} has entered");  
  
        // Simulating work  
        Thread.Sleep(1000);  
  
        Console.WriteLine($"Request {id} is leaving");  
        semaphore.Release(); // Release the semaphore  
    }  
}

此示例说明如何使用 a 来限制并发执行特定代码段的线程数,从而有效地管理有限的资源。SemaphoreSlim

5.监视器:锁定和同步 🔒

问题陈述:您需要同步对共享列表的访问,其中一个或多个线程可以添加项目,但访问必须是独占的,以确保线程安全。

使用 C# 示例的解决方案:

using System;  
using System.Collections.Generic;  
using System.Threading;  
  
class Program  
{  
    private static List<int> sharedList = new List<int>();  
    private static readonly object locker = new object();  
  
    static void Main(string[] args)  
    {  
        Thread writerThread = new Thread(AddToList);  
        writerThread.Start();  
  
        Thread readerThread = new Thread(ReadFromList);  
        readerThread.Start();  
    }  
  
    static void AddToList()  
    {  
        lock (locker) // Acquire the lock  
        {  
            for (int i = 0; i < 10; i++)  
            {  
                sharedList.Add(i);  
                Console.WriteLine($"Added {i} to the list");  
                Thread.Sleep(100); // Simulate work  
            }  
        }  
    }  
  
    static void ReadFromList()  
    {  
        lock (locker) // Acquire the lock  
        {  
            foreach (int i in sharedList)  
            {  
                Console.WriteLine($"Read {i} from the list");  
            }  
        }  
    }  
}

在此示例中,语句(内部使用 )用于确保一次只有一个线程可以访问共享列表,从而防止并发修改并确保线程安全。lockMonitor

6. ReaderWriterLockSlim:优化读/写访问 📖✍️

问题陈述:在涉及对共享资源的频繁读取操作和偶尔写入操作的应用程序中,需要通过同时允许多个读取器来优化性能,但编写器应具有独占访问权限。

使用 C# 示例的解决方案:

using System;  
using System.Threading;  
  
class Program  
{  
    private static ReaderWriterLockSlim rwLock = new ReaderWriterLockSlim();  
    private static int sharedResource = 0;  
  
    static void Main(string[] args)  
    {  
        Thread readThread1 = new Thread(ReadResource);  
        readThread1.Start();  
        Thread readThread2 = new Thread(ReadResource);  
        readThread2.Start();  
  
        Thread writeThread = new Thread(WriteResource);  
        writeThread.Start();  
  
        readThread1.Join();  
        readThread2.Join();  
        writeThread.Join();  
    }  
  
    static void ReadResource()  
    {  
        rwLock.EnterReadLock();  
        try  
        {  
            Console.WriteLine($"Read value: {sharedResource}");  
            Thread.Sleep(500); // Simulate reading work  
        }  
        finally  
        {  
            rwLock.ExitReadLock();  
        }  
    }  
  
    static void WriteResource()  
    {  
        rwLock.EnterWriteLock();  
        try  
        {  
            sharedResource = new Random().Next(100);  
            Console.WriteLine($"Wrote value: {sharedResource}");  
            Thread.Sleep(500); // Simulate writing work  
        }  
        finally  
        {  
            rwLock.ExitWriteLock();  
        }  
    }  
}

此示例展示了 ,它允许多个线程同时读取共享资源以提高效率,但确保写入操作是独占的,从而保持数据完整性。ReaderWriterLockSlim

高级同步技术和最佳实践 🔍

介绍 .NET Core 中的基本同步基元后,让我们更深入地了解高级同步技术和最佳做法,以便在更复杂的方案中管理线程。了解这些高级概念对于构建可靠、可扩展且无死锁的应用程序至关重要。🌟

7. Async/Await:异步同步 🔄

问题陈述:考虑应用程序执行 I/O 绑定操作(如数据库访问或文件读取/写入)的场景,这可能会阻塞线程,从而导致系统资源使用效率低下。

使用 C# 示例的解决方案:

using System;  
using System.Threading.Tasks;  
  
class Program  
{  
    static async Task Main(string[] args)  
    {  
        await AccessDatabaseAsync();  
        await ReadFileAsync();  
    }  
  
    static async Task AccessDatabaseAsync()  
    {  
        // Simulate database access  
        Console.WriteLine("Accessing database...");  
        await Task.Delay(1000); // Async wait without blocking the thread  
        Console.WriteLine("Database accessed.");  
    }  
  
    static async Task ReadFileAsync()  
    {  
        // Simulate file reading  
        Console.WriteLine("Reading file...");  
        await Task.Delay(1000); // Async wait without blocking the thread  
        Console.WriteLine("File read.");  
    }  
}

此示例演示如何使用 C# 中的 async/await 模式在不阻塞线程的情况下执行异步操作。这种方法提高了应用程序的可伸缩性和效率,尤其是在 I/O 密集型方案中。

8. 联锁:原子操作 🔒⚡

问题陈述:您需要执行简单的原子操作,例如递增共享计数器,而不会产生锁的开销。

使用 C# 示例的解决方案:

using System;  
using System.Threading;  
using System.Threading.Tasks;  
  
class Program  
{  
    private static int counter = 0;  
  
    static void Main(string[] args)  
    {  
        Parallel.For(0, 10000, _ =>  
        {  
            Interlocked.Increment(ref counter);  
        });  
  
        Console.WriteLine($"Counter value: {counter}");  
    }  
}

此示例使用该类同时安全地从多个线程递增共享计数器。 操作确保原子性,防止竞争条件,而无需显式锁定机制。InterlockedInterlocked

9. 障碍:同步并发操作 🚧

问题陈述:假设需要并行执行多个操作,但您必须等到所有操作达到某个点,然后才能继续进行任何操作。

使用 C# 示例的解决方案:

using System;  
using System.Threading;  
using System.Threading.Tasks;  
  
class Program  
{  
    static Barrier barrier = new Barrier(3, (b) =>  
    {  
        Console.WriteLine($"Phase {b.CurrentPhaseNumber} completed.");  
    });  
  
    static void Main(string[] args)  
    {  
        Task.Run(() => PerformPartOfOperation(1));  
        Task.Run(() => PerformPartOfOperation(2));  
        Task.Run(() => PerformPartOfOperation(3));  
  
        Console.ReadLine(); // Prevent the program from exiting immediately  
    }  
  
    static void PerformPartOfOperation(int part)  
    {  
        Console.WriteLine($"Performing part {part} of the operation");  
        // Simulate work  
        Thread.Sleep(new Random().Next(1000, 3000));  
        Console.WriteLine($"Part {part} of the operation completed");  
  
        barrier.SignalAndWait(); // Wait for other parts to complete  
    }  
}

此示例演示如何使用 a 来同步多个任务,确保所有任务在执行任何任务之前到达某个点。这在需要分阶段执行的并行算法中特别有用。Barrier

10. CountdownEvent:等待多个操作完成 🏁

问题陈述:在继续应用程序的下一步之前,需要完成多个异步操作,类似于联接线程,但方式更灵活、更可伸缩。

使用 C# 示例的解决方案:

using System;  
using System.Threading;  
using System.Threading.Tasks;  
  
class Program  
{  
    static CountdownEvent countdown = new CountdownEvent(3);  
  
    static void Main(string[] args)  
    {  
        Task.Run(() => PerformOperation("Operation 1"));  
        Task.Run(() => PerformOperation("Operation 2"));  
        Task.Run(() => PerformOperation("Operation 3"));  
  
        countdown.Wait(); // Wait for all operations to complete  
        Console.WriteLine("All operations completed.");  
  
        Console.ReadLine(); // Prevent the program from exiting immediately  
    }  
  
    static void PerformOperation(string name)  
    {  
        Console.WriteLine($"{name} started.");  
        // Simulate operation  
        Thread.Sleep(new Random().Next(1000, 5000));  
        Console.WriteLine($"{name} completed.");  
  
        countdown.Signal(); // Signal completion of the operation  
    }  
}

此示例演示了 的用法,以等待一定数量的操作完成,然后再继续。它提供了一种简单有效的方法来同步任务,尤其是在涉及异步操作或多个处理阶段的方案中。CountdownEvent

多线程应用程序的线程安全数据结构

1.并发收藏:

并发集合(如 ConcurrentQueue、ConcurrentDictionary 和 ConcurrentBag)是专为多线程应用程序设计的线程安全数据结构。它们允许多个线程操作和访问集合,而无需显式同步。

例:

using System.Collections.Concurrent;  
using System.Threading;  
using System.Threading.Tasks;  
  
public class ConcurrentCollectionsExample  
{  
    private static ConcurrentQueue<int> _queue = new ConcurrentQueue<int>();  
  
    public static void Main()  
    {  
        Task.WaitAll(  
            Task.Run(() => EnqueueData(1)),  
            Task.Run(() => EnqueueData(2)),  
            Task.Run(() => DequeueData())  
        );  
    }  
  
    private static void EnqueueData(int data)  
    {  
        _queue.Enqueue(data);  
    }  
  
    private static void DequeueData()  
    {  
        if (_queue.TryDequeue(out int data))  
        {  
            Console.WriteLine($"Dequeued: {data}");  
        }  
        else  
        {  
            Console.WriteLine("Queue is empty");  
        }  
    }  
}

在此示例中,ConcurrentQueue 用于以线程安全的方式存储和检索整数。多个线程可以对数据进行排队和取消排队,而无需显式同步,从而确保数据一致性和线程安全性。

2.原子操作和Thread.VolatileRead/Write

溶液:

默认情况下,原子操作(例如读取和写入不大于本机字大小的内置类型的变量(例如 int))是线程安全的。但是,可能需要使用 Thread.VolatileRead 和 Thread.VolatileWrite 方法来确保具有弱内存模型的多个处理器或内核之间的读取和写入可见性。

例:

public class AtomicOperationsExample  
{  
    private static int _counter = 0;  
  
    public static void Main()  
    {  
        Task.WaitAll(  
            Task.Run(() => IncrementCounter()),  
            Task.Run(() => IncrementCounter())  
        );  
  
        Console.WriteLine("Final counter value: " + _counter);  
    }  
  
    private static void IncrementCounter()  
    {  
        int currentValue = Thread.VolatileRead(ref _counter);  
        while (true)  
        {  
            int newValue = currentValue + 1;  
            int result = Interlocked.CompareExchange(ref _counter, newValue, currentValue);  
            if (result == currentValue)  
            {  
                break;  
            }  
            currentValue = result;  
        }  
    }  
}

在此示例中,Thread.VolatileRead 确保从主内存中读取 _counter 的最新值。Interlocked.CompareExchange 方法用于对 _counter 变量执行原子比较和交换操作,确保其值以原子方式和线程安全地更新。

3.使用 Thread.MemoryBarrier 对内存访问进行细粒度控制

Thread.MemoryBarrier 是一种低级方法,可确保编译器、运行时或硬件不会跨屏障对内存操作进行重新排序。当您需要对内存访问进行精细控制时,尤其是在处理弱内存模型或特定于处理器的优化时,可以使用它。

例:

public class MemoryBarrierExample  
{  
    private static bool _flag = false;  
    private static int _data = 0;  
  
    public static void Main()  
    {  
        Task.WaitAll(  
            Task.Run(() => UpdateFlagAndData(true, 1)),  
            Task.Run(() => UpdateFlagAndData(false, 2))  
        );  
  
        Console.WriteLine("Flag: " + _flag);  
        Console.WriteLine("Data: " + _data);  
    }  
  
    private static void UpdateFlagAndData(bool newFlagValue, int newDataValue)  
    {  
        _flag = newFlagValue;  
        Thread.MemoryBarrier();  
        _data = newDataValue;  
    }  
}

在此示例中,两个线程更新 _flag 和 _data 变量。Thread.MemoryBarrier 确保对 _flag 的写入不会随着对_data的写入而重新排序,从而保留跨屏障的内存操作顺序。

4.高级并发抽象:任务和并行性

溶液:

.NET Framework 提供了用于管理并发操作(如 Task 和 Parallel 类)的高级抽象,这有助于简化应用程序中的多线程处理、同步和并行性。

例:

using System.Threading.Tasks;  
  
public class ParallelExample  
{  
    public static void Main()  
{  
        int[] numbers = { 1, 2, 3, 4, 5 };  
  
        Parallel.For(0, numbers.Length, i =>  
        {  
            Console.WriteLine($"Task {Task.CurrentId} is processing number {numbers[i]}");  
        });  
    }  
}

在此示例中,Parallel.For 方法用于并发处理 numbers 数组的元素。该框架可自动管理基础任务的创建、调度和同步,从而简化应用程序中并行处理的实现。

5.用于基于事件和异步编程的反应式扩展(Rx)

Reactive Extensions (Rx) 是一个库,用于使用可观察序列和 LINQ 样式的查询运算符编写异步和基于事件的程序。它提供了对传统的基于事件和回调驱动的编程模型的强大抽象,使编写响应式和可扩展的应用程序变得更加容易。

例:

using System.Reactive.Linq;  
using System.Threading.Tasks;  
  
public class ReactiveExtensionsExample  
{  
    public static void Main()  
{  
        var observable = Observable.Interval(TimeSpan.FromSeconds(1));  
  
        using (observable.Subscribe(x => Console.WriteLine($"Received: {x}")))  
        {  
            Console.WriteLine("Waiting for events...");  
            Task.Delay(TimeSpan.FromSeconds(5)).Wait(); // Wait for 5 seconds  
}  
    }  
}

在此示例中,将创建一个 Observable.Interval,以按指定的间隔(1 秒)生成整数值序列。Subscribe 方法用于注册将处理生成值的观察器。程序等待 5 秒钟,在此期间观察者打印接收到的值。

6.并发任务之间的高性能数据交换通道

.NET Core 提供 Channel<T> 类,用于在并发任务之间创建高性能、线程安全的通信通道。通道对于生产者-消费者方案特别有用,其中多个生产者生成由多个使用者使用的数据。

例:

using System.Threading.Channels;  
using System.Threading.Tasks;  
  
public class ChannelExample  
{  
    public static void Main()  
    {  
        var channel = Channel.CreateBounded<int>(5);  
  
        Task.Run(() => Producer(channel));  
        Task.Run(() => Consumer(channel));  
  
        Task.WaitAll();  
    }  
  
    private static async Task Producer(Channel\<int> channel)  
    {  
        for (int i = 0; i < 10; i++)  
        {  
            await channel.Writer.WriteAsync(i);  
            Console.WriteLine($"Produced: {i}");  
        }  
    }  
  
    private static async Task Consumer(Channel\<int> channel)  
    {  
        while (await channel.Reader.WaitToReadAsync())  
        {  
            while (channel.Reader.TryRead(out int item))  
            {  
                Console.WriteLine($"Consumed: {item}");  
            }  
        }  
    }  
}

在此示例中,将创建一个容量为 5 的有界通道,用于在生产者和使用者任务之间交换整数值。生产者任务使用 Writer 属性生成值并将其写入通道,而使用者任务使用 Reader 属性从通道读取值。通道处理同步和缓冲,允许两个任务同时运行。

7.Actor Model with Akka.NET:构建分布式和弹性消息驱动系统

执行组件模型是一种用于构建并发应用程序的模式,这些应用程序使用在称为执行组件的轻量级独立处理单元之间传递异步消息。Akka.NET 是一个工具包和运行时,用于使用 Actor 模型构建分布式和弹性消息驱动系统。

例:

首先,安装 Akka.NET NuGet 包:

dotnet add package Akkausing Akka.Actor; 
using System;  
  
public class ActorExample  
{  
    public static void Main()  
   {  
        var actorSystem = ActorSystem.Create("MyActorSystem");  
        var actor = actorSystem.ActorOf<MyActor>("myActor");  
  
        for (int i = 0; i < 5; i++)  
        {  
            actor.Tell(i);  
        }  
  
        Console.ReadLine(); // Keep the program running  
    }  
}  
  
public class MyActor : ActorBase  
{  
    protected override bool Receive(object message)  
    {  
        if (message is int number)  
        {  
            Console.WriteLine($"Received: {number}");  
            return true;  
        }  
  
        return false;  
    }  
}

在此示例中,将创建一个 Akka.NET 执行组件系统,并在系统中生成 MyActor 类的实例作为执行组件。Main 方法使用 Tell 方法将整数消息发送给执行组件,执行组件在重写的 Receive 方法中处理这些消息。

8.使用 TPL 数据流库的 Dataflow:构建健壮且响应迅速的数据处理管道

任务并行库 (TPL) 数据流提供用于构建数据处理管道和网络的组件和模式。它简化了处理异步和基于事件的数据处理的并发应用程序的开发。

例:

首先,安装 TPL Dataflow NuGet 包:

dotnet add package System.Threading.Tasks.Dataflowusing 
System.Threading.Tasks.Dataflow;  
  
public class DataflowExample  
{  
    public static void Main()  
    {  
        var actionBlock = new ActionBlock<int>(  
            x => Console.WriteLine($"Processed: {x}"),  
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });  
  
        for (int i = 0; i < 10; i++)  
        {  
            actionBlock.Post(i);  
        }  
  
        actionBlock.Complete();  
        actionBlock.Completion.Wait();  
    }  
}

在此示例中,将创建一个 ActionBlock 以异步处理最大并行度为 2 的整数值。Main 方法将值发布到块,然后在等待处理所有已发布的值之前发出完成信号。

其他注意事项和最佳实践

选择正确的同步基元

  • 分析您的需求:了解应用程序的特定需求,例如读/写操作的频率、性能与数据安全的重要性以及涉及的线程数。
  • 首选更高级别的构造:尽可能使用高级构造,例如 I/O 绑定操作或集合,这些构造封装了线程同步的复杂性。async/awaitConcurrent

避免常见陷阱

  • 死锁:当两个或多个线程相互等待释放资源时,会发生死锁。为避免死锁,请确保以一致的顺序获取锁,并考虑使用超时进行锁获取。
  • 饥饿:确保当其他线程继续运行时,没有线程永远等待对资源的访问。资源分配的公平性(可能通过与队列一起使用)可以缓解此问题。SemaphoreSlim
  • 错误共享:当多个线程对存储在内存中(在同一缓存行内)的数据进行操作时,这可能会导致性能下降。请注意数据结构布局,并考虑填充或对齐策略以减少错误共享。

性能注意事项

  • 最小化锁争用:当多个线程试图获取另一个线程持有的锁时,就会发生锁争用,从而导致线程挂起和上下文切换。为了最大程度地减少争用,请尽可能缩短锁定部分,并考虑在适当的情况下使用读/写锁或原子操作。
  • 可伸缩性:随着线程数的增加,管理线程同步的开销可能会成为瓶颈。在预期的负载条件下测试应用程序,并在选择同步基元时考虑可伸缩性。

测试和调试

  • 并发测试:并发问题可能难以重现和调试。使用任务并行库 (TPL) 数据流块等工具进行结构化并发,或使用 Visual Studio 并发可视化工具分析和调试并发应用程序。
  • 静态分析工具:利用静态分析工具检测代码库中的潜在死锁、争用条件和其他并发问题。

实际应用

考虑一个实时股票交易平台,其中高吞吐量和低延迟至关重要。该应用程序每秒处理数千个买入/卖出订单,每个订单都需要访问共享的财务数据。有效的同步策略可能涉及:

  • ReaderWriterLockSlim 用于财务数据访问,允许在序列化写入时进行多个并发读取,以确保数据完整性。
  • Async/await 用于非阻塞 I/O 操作(如数据库访问或网络调用),以提高可伸缩性。
  • 用于更新关键计数器(例如已处理的订单总数)的联锁操作,以确保原子性,而不会产生锁定开销。
  • 并发集合,如 ,用于管理由多个线程频繁更新的共享状态,提供线程安全操作,而无需显式锁定。ConcurrentDictionary

结论🎉

.NET Core 中的线程同步对于开发可靠且高效的多线程应用程序至关重要。通过了解和利用 .NET Core 提供的同步基元(如互斥锁、信号量、监视器和 ReaderWriterLockSlim),开发人员可以确保安全访问共享资源,从而防止争用条件、死锁和数据损坏。这些示例为在实际应用中实现线程同步奠定了基础,从而在并发编程中提供了性能和安全性。

相关留言评论
昵称:
邮箱:
阅读排行