.NET Core 中的线程同步是开发高性能并发应用程序的关键方面。它确保多个线程可以安全地访问资源,而不会导致数据损坏或不一致。本文深入探讨了 .NET Core 中线程同步的核心概念,为各种方案提供了深入的说明和 C# 编码示例。🚀
在多线程应用程序中,线程通常需要与共享资源(如数据结构、文件或数据库)进行交互。如果没有适当的同步,多个线程的并发访问可能会导致争用条件、死锁和数据损坏等问题。
.NET Core 提供了多个同步基元来处理不同的同步需求:.NET Core provides several synchronization primitives to handle different synchronization needs:
问题陈述:假设您有一个关键代码段用于更新共享资源(例如文件或数据库条目),并且您需要确保一次只有一个线程可以执行此部分以防止数据损坏。
使用 C# 示例的解决方案:
using System;
using System.Threading;
class Program
{
private static Mutex mutex = new Mutex();
static void Main(string[] args)
{
for (int i = 0; i < 5; i++)
{
Thread t = new Thread(WriteToSharedResource);
t.Name = $"Thread{i + 1}";
t.Start();
}
}
static void WriteToSharedResource()
{
Console.WriteLine($"{Thread.CurrentThread.Name} is requesting the mutex");
mutex.WaitOne(); // Requesting the mutex
Console.WriteLine($"{Thread.CurrentThread.Name} has entered the critical section");
// Simulate work (e.g., writing to a shared resource)
Thread.Sleep(1000);
Console.WriteLine($"{Thread.CurrentThread.Name} is leaving the critical section");
mutex.ReleaseMutex(); // Releasing the mutex
}
}
此示例演示如何使用 a 来确保一次只有一个线程可以访问关键部分,从而防止并发访问问题。Mutex
问题陈述:假设您有一个处理传入请求的应用程序,但由于资源限制(例如,数据库连接),您只能同时处理一定数量的请求。
使用 C# 示例的解决方案:
using System;
using System.Threading;
class Program
{
static SemaphoreSlim semaphore = new SemaphoreSlim(3); // Allows up to 3 threads to enter
static void Main(string[] args)
{
for (int i = 0; i < 10; i++)
{
Thread t = new Thread(EnterSemaphore);
t.Start(i);
}
}
static void EnterSemaphore(object id)
{
Console.WriteLine($"Request {id} is waiting to enter");
semaphore.Wait(); // Request to enter the semaphore
Console.WriteLine($"Request {id} has entered");
// Simulating work
Thread.Sleep(1000);
Console.WriteLine($"Request {id} is leaving");
semaphore.Release(); // Release the semaphore
}
}
此示例说明如何使用 a 来限制并发执行特定代码段的线程数,从而有效地管理有限的资源。SemaphoreSlim
问题陈述:您需要同步对共享列表的访问,其中一个或多个线程可以添加项目,但访问必须是独占的,以确保线程安全。
使用 C# 示例的解决方案:
using System;
using System.Collections.Generic;
using System.Threading;
class Program
{
private static List<int> sharedList = new List<int>();
private static readonly object locker = new object();
static void Main(string[] args)
{
Thread writerThread = new Thread(AddToList);
writerThread.Start();
Thread readerThread = new Thread(ReadFromList);
readerThread.Start();
}
static void AddToList()
{
lock (locker) // Acquire the lock
{
for (int i = 0; i < 10; i++)
{
sharedList.Add(i);
Console.WriteLine($"Added {i} to the list");
Thread.Sleep(100); // Simulate work
}
}
}
static void ReadFromList()
{
lock (locker) // Acquire the lock
{
foreach (int i in sharedList)
{
Console.WriteLine($"Read {i} from the list");
}
}
}
}
在此示例中,语句(内部使用 )用于确保一次只有一个线程可以访问共享列表,从而防止并发修改并确保线程安全。lockMonitor
问题陈述:在涉及对共享资源的频繁读取操作和偶尔写入操作的应用程序中,需要通过同时允许多个读取器来优化性能,但编写器应具有独占访问权限。
使用 C# 示例的解决方案:
using System;
using System.Threading;
class Program
{
private static ReaderWriterLockSlim rwLock = new ReaderWriterLockSlim();
private static int sharedResource = 0;
static void Main(string[] args)
{
Thread readThread1 = new Thread(ReadResource);
readThread1.Start();
Thread readThread2 = new Thread(ReadResource);
readThread2.Start();
Thread writeThread = new Thread(WriteResource);
writeThread.Start();
readThread1.Join();
readThread2.Join();
writeThread.Join();
}
static void ReadResource()
{
rwLock.EnterReadLock();
try
{
Console.WriteLine($"Read value: {sharedResource}");
Thread.Sleep(500); // Simulate reading work
}
finally
{
rwLock.ExitReadLock();
}
}
static void WriteResource()
{
rwLock.EnterWriteLock();
try
{
sharedResource = new Random().Next(100);
Console.WriteLine($"Wrote value: {sharedResource}");
Thread.Sleep(500); // Simulate writing work
}
finally
{
rwLock.ExitWriteLock();
}
}
}
此示例展示了 ,它允许多个线程同时读取共享资源以提高效率,但确保写入操作是独占的,从而保持数据完整性。ReaderWriterLockSlim
介绍 .NET Core 中的基本同步基元后,让我们更深入地了解高级同步技术和最佳做法,以便在更复杂的方案中管理线程。了解这些高级概念对于构建可靠、可扩展且无死锁的应用程序至关重要。🌟
问题陈述:考虑应用程序执行 I/O 绑定操作(如数据库访问或文件读取/写入)的场景,这可能会阻塞线程,从而导致系统资源使用效率低下。
使用 C# 示例的解决方案:
using System;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[] args)
{
await AccessDatabaseAsync();
await ReadFileAsync();
}
static async Task AccessDatabaseAsync()
{
// Simulate database access
Console.WriteLine("Accessing database...");
await Task.Delay(1000); // Async wait without blocking the thread
Console.WriteLine("Database accessed.");
}
static async Task ReadFileAsync()
{
// Simulate file reading
Console.WriteLine("Reading file...");
await Task.Delay(1000); // Async wait without blocking the thread
Console.WriteLine("File read.");
}
}
此示例演示如何使用 C# 中的 async/await 模式在不阻塞线程的情况下执行异步操作。这种方法提高了应用程序的可伸缩性和效率,尤其是在 I/O 密集型方案中。
问题陈述:您需要执行简单的原子操作,例如递增共享计数器,而不会产生锁的开销。
使用 C# 示例的解决方案:
using System;
using System.Threading;
using System.Threading.Tasks;
class Program
{
private static int counter = 0;
static void Main(string[] args)
{
Parallel.For(0, 10000, _ =>
{
Interlocked.Increment(ref counter);
});
Console.WriteLine($"Counter value: {counter}");
}
}
此示例使用该类同时安全地从多个线程递增共享计数器。 操作确保原子性,防止竞争条件,而无需显式锁定机制。InterlockedInterlocked
问题陈述:假设需要并行执行多个操作,但您必须等到所有操作达到某个点,然后才能继续进行任何操作。
使用 C# 示例的解决方案:
using System;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static Barrier barrier = new Barrier(3, (b) =>
{
Console.WriteLine($"Phase {b.CurrentPhaseNumber} completed.");
});
static void Main(string[] args)
{
Task.Run(() => PerformPartOfOperation(1));
Task.Run(() => PerformPartOfOperation(2));
Task.Run(() => PerformPartOfOperation(3));
Console.ReadLine(); // Prevent the program from exiting immediately
}
static void PerformPartOfOperation(int part)
{
Console.WriteLine($"Performing part {part} of the operation");
// Simulate work
Thread.Sleep(new Random().Next(1000, 3000));
Console.WriteLine($"Part {part} of the operation completed");
barrier.SignalAndWait(); // Wait for other parts to complete
}
}
此示例演示如何使用 a 来同步多个任务,确保所有任务在执行任何任务之前到达某个点。这在需要分阶段执行的并行算法中特别有用。Barrier
问题陈述:在继续应用程序的下一步之前,需要完成多个异步操作,类似于联接线程,但方式更灵活、更可伸缩。
使用 C# 示例的解决方案:
using System;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static CountdownEvent countdown = new CountdownEvent(3);
static void Main(string[] args)
{
Task.Run(() => PerformOperation("Operation 1"));
Task.Run(() => PerformOperation("Operation 2"));
Task.Run(() => PerformOperation("Operation 3"));
countdown.Wait(); // Wait for all operations to complete
Console.WriteLine("All operations completed.");
Console.ReadLine(); // Prevent the program from exiting immediately
}
static void PerformOperation(string name)
{
Console.WriteLine($"{name} started.");
// Simulate operation
Thread.Sleep(new Random().Next(1000, 5000));
Console.WriteLine($"{name} completed.");
countdown.Signal(); // Signal completion of the operation
}
}
此示例演示了 的用法,以等待一定数量的操作完成,然后再继续。它提供了一种简单有效的方法来同步任务,尤其是在涉及异步操作或多个处理阶段的方案中。CountdownEvent
并发集合(如 ConcurrentQueue、ConcurrentDictionary 和 ConcurrentBag)是专为多线程应用程序设计的线程安全数据结构。它们允许多个线程操作和访问集合,而无需显式同步。
例:
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class ConcurrentCollectionsExample
{
private static ConcurrentQueue<int> _queue = new ConcurrentQueue<int>();
public static void Main()
{
Task.WaitAll(
Task.Run(() => EnqueueData(1)),
Task.Run(() => EnqueueData(2)),
Task.Run(() => DequeueData())
);
}
private static void EnqueueData(int data)
{
_queue.Enqueue(data);
}
private static void DequeueData()
{
if (_queue.TryDequeue(out int data))
{
Console.WriteLine($"Dequeued: {data}");
}
else
{
Console.WriteLine("Queue is empty");
}
}
}
在此示例中,ConcurrentQueue 用于以线程安全的方式存储和检索整数。多个线程可以对数据进行排队和取消排队,而无需显式同步,从而确保数据一致性和线程安全性。
溶液:
默认情况下,原子操作(例如读取和写入不大于本机字大小的内置类型的变量(例如 int))是线程安全的。但是,可能需要使用 Thread.VolatileRead 和 Thread.VolatileWrite 方法来确保具有弱内存模型的多个处理器或内核之间的读取和写入可见性。
例:
public class AtomicOperationsExample
{
private static int _counter = 0;
public static void Main()
{
Task.WaitAll(
Task.Run(() => IncrementCounter()),
Task.Run(() => IncrementCounter())
);
Console.WriteLine("Final counter value: " + _counter);
}
private static void IncrementCounter()
{
int currentValue = Thread.VolatileRead(ref _counter);
while (true)
{
int newValue = currentValue + 1;
int result = Interlocked.CompareExchange(ref _counter, newValue, currentValue);
if (result == currentValue)
{
break;
}
currentValue = result;
}
}
}
在此示例中,Thread.VolatileRead 确保从主内存中读取 _counter 的最新值。Interlocked.CompareExchange 方法用于对 _counter 变量执行原子比较和交换操作,确保其值以原子方式和线程安全地更新。
Thread.MemoryBarrier 是一种低级方法,可确保编译器、运行时或硬件不会跨屏障对内存操作进行重新排序。当您需要对内存访问进行精细控制时,尤其是在处理弱内存模型或特定于处理器的优化时,可以使用它。
例:
public class MemoryBarrierExample
{
private static bool _flag = false;
private static int _data = 0;
public static void Main()
{
Task.WaitAll(
Task.Run(() => UpdateFlagAndData(true, 1)),
Task.Run(() => UpdateFlagAndData(false, 2))
);
Console.WriteLine("Flag: " + _flag);
Console.WriteLine("Data: " + _data);
}
private static void UpdateFlagAndData(bool newFlagValue, int newDataValue)
{
_flag = newFlagValue;
Thread.MemoryBarrier();
_data = newDataValue;
}
}
在此示例中,两个线程更新 _flag 和 _data 变量。Thread.MemoryBarrier 确保对 _flag 的写入不会随着对_data的写入而重新排序,从而保留跨屏障的内存操作顺序。
溶液:
.NET Framework 提供了用于管理并发操作(如 Task 和 Parallel 类)的高级抽象,这有助于简化应用程序中的多线程处理、同步和并行性。
例:
using System.Threading.Tasks;
public class ParallelExample
{
public static void Main()
{
int[] numbers = { 1, 2, 3, 4, 5 };
Parallel.For(0, numbers.Length, i =>
{
Console.WriteLine($"Task {Task.CurrentId} is processing number {numbers[i]}");
});
}
}
在此示例中,Parallel.For 方法用于并发处理 numbers 数组的元素。该框架可自动管理基础任务的创建、调度和同步,从而简化应用程序中并行处理的实现。
Reactive Extensions (Rx) 是一个库,用于使用可观察序列和 LINQ 样式的查询运算符编写异步和基于事件的程序。它提供了对传统的基于事件和回调驱动的编程模型的强大抽象,使编写响应式和可扩展的应用程序变得更加容易。
例:
using System.Reactive.Linq;
using System.Threading.Tasks;
public class ReactiveExtensionsExample
{
public static void Main()
{
var observable = Observable.Interval(TimeSpan.FromSeconds(1));
using (observable.Subscribe(x => Console.WriteLine($"Received: {x}")))
{
Console.WriteLine("Waiting for events...");
Task.Delay(TimeSpan.FromSeconds(5)).Wait(); // Wait for 5 seconds
}
}
}
在此示例中,将创建一个 Observable.Interval,以按指定的间隔(1 秒)生成整数值序列。Subscribe 方法用于注册将处理生成值的观察器。程序等待 5 秒钟,在此期间观察者打印接收到的值。
.NET Core 提供 Channel<T> 类,用于在并发任务之间创建高性能、线程安全的通信通道。通道对于生产者-消费者方案特别有用,其中多个生产者生成由多个使用者使用的数据。
例:
using System.Threading.Channels;
using System.Threading.Tasks;
public class ChannelExample
{
public static void Main()
{
var channel = Channel.CreateBounded<int>(5);
Task.Run(() => Producer(channel));
Task.Run(() => Consumer(channel));
Task.WaitAll();
}
private static async Task Producer(Channel\<int> channel)
{
for (int i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync(i);
Console.WriteLine($"Produced: {i}");
}
}
private static async Task Consumer(Channel\<int> channel)
{
while (await channel.Reader.WaitToReadAsync())
{
while (channel.Reader.TryRead(out int item))
{
Console.WriteLine($"Consumed: {item}");
}
}
}
}
在此示例中,将创建一个容量为 5 的有界通道,用于在生产者和使用者任务之间交换整数值。生产者任务使用 Writer 属性生成值并将其写入通道,而使用者任务使用 Reader 属性从通道读取值。通道处理同步和缓冲,允许两个任务同时运行。
执行组件模型是一种用于构建并发应用程序的模式,这些应用程序使用在称为执行组件的轻量级独立处理单元之间传递异步消息。Akka.NET 是一个工具包和运行时,用于使用 Actor 模型构建分布式和弹性消息驱动系统。
例:
首先,安装 Akka.NET NuGet 包:
dotnet add package Akkausing Akka.Actor;
using System;
public class ActorExample
{
public static void Main()
{
var actorSystem = ActorSystem.Create("MyActorSystem");
var actor = actorSystem.ActorOf<MyActor>("myActor");
for (int i = 0; i < 5; i++)
{
actor.Tell(i);
}
Console.ReadLine(); // Keep the program running
}
}
public class MyActor : ActorBase
{
protected override bool Receive(object message)
{
if (message is int number)
{
Console.WriteLine($"Received: {number}");
return true;
}
return false;
}
}
在此示例中,将创建一个 Akka.NET 执行组件系统,并在系统中生成 MyActor 类的实例作为执行组件。Main 方法使用 Tell 方法将整数消息发送给执行组件,执行组件在重写的 Receive 方法中处理这些消息。
任务并行库 (TPL) 数据流提供用于构建数据处理管道和网络的组件和模式。它简化了处理异步和基于事件的数据处理的并发应用程序的开发。
例:
首先,安装 TPL Dataflow NuGet 包:
dotnet add package System.Threading.Tasks.Dataflowusing
System.Threading.Tasks.Dataflow;
public class DataflowExample
{
public static void Main()
{
var actionBlock = new ActionBlock<int>(
x => Console.WriteLine($"Processed: {x}"),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
for (int i = 0; i < 10; i++)
{
actionBlock.Post(i);
}
actionBlock.Complete();
actionBlock.Completion.Wait();
}
}
在此示例中,将创建一个 ActionBlock 以异步处理最大并行度为 2 的整数值。Main 方法将值发布到块,然后在等待处理所有已发布的值之前发出完成信号。
考虑一个实时股票交易平台,其中高吞吐量和低延迟至关重要。该应用程序每秒处理数千个买入/卖出订单,每个订单都需要访问共享的财务数据。有效的同步策略可能涉及:
.NET Core 中的线程同步对于开发可靠且高效的多线程应用程序至关重要。通过了解和利用 .NET Core 提供的同步基元(如互斥锁、信号量、监视器和 ReaderWriterLockSlim),开发人员可以确保安全访问共享资源,从而防止争用条件、死锁和数据损坏。这些示例为在实际应用中实现线程同步奠定了基础,从而在并发编程中提供了性能和安全性。