C#中EFCore的并发处理实例讲解

作者:微信公众号:【架构师老卢】
4-18 8:24
26

概述:在EF Core 中,并发处理是框架管理多个用户或进程尝试同时更改相同数据的情况的方式。它主要通过乐观并发控制来实现这一点,该控制检测数据自上次访问以来是否发生了变化。 如果检测到更改,EF Core 将引发冲突。然后,开发人员可以决定如何处理冲突,例如合并更改或通知用户。虽然 EF Core 对此有所帮助,但开发人员可能仍需要为特定方案编写自定义代码,例如处理旧系统或优化性能。实体框架核心 (EF Core) 确实提供了某种级别的并发处理,但了解它的作用和工作原理非常重要。乐观并发EF Core 主要使用乐观并发控制。这意味着它不会在读取操作期间锁定行或表。相反,它会检测数据自上次查询以来是

在EF Core 中,并发处理是框架管理多个用户或进程尝试同时更改相同数据的情况的方式。它主要通过乐观并发控制来实现这一点,该控制检测数据自上次访问以来是否发生了变化如果检测到更改,EF Core 将引发冲突。然后,开发人员可以决定如何处理冲突,例如合并更改或通知用户。虽然 EF Core 对此有所帮助,但开发人员可能仍需要为特定方案编写自定义代码,例如处理旧系统或优化性能。

实体框架核心 (EF Core) 确实提供了某种级别的并发处理,但了解它的作用和工作原理非常重要。

乐观并发

EF Core 主要使用乐观并发控制。这意味着它不会在读取操作期间锁定行或表。相反,它会检测数据自上次查询以来是否已更改。检索实体时,EF Core 会跟踪其原始值。尝试更新该实体时,EF Core 会将数据库中的当前值与原始值进行比较。如果它们不同,则表示另一个进程修改了数据,并且 EF Core 可能会引发并发异常

发生并发冲突时,EF Core 可以引发 DbUpdateConcurrencyException。可以在代码中处理此异常以决定如何继续,例如通过刷新数据、合并更改或通知用户冲突

EF Core 支持在更新和删除期间自动包含在 SQL WHERE 子句中的并发令牌(例如:Timestamp 或 RowVersion 列)。这些令牌使 EF Core 能够更可靠地检测并发冲突。

虽然 EF Core 提供了处理并发的机制,但对于开发人员来说,了解并发的工作原理并为其应用程序实现适当的策略仍然很重要。

手动处理并发的重要性

自定义冲突解决逻辑

EF Core 的自动并发处理可能并不总是符合应用程序的特定要求。开发人员可能需要实现超出 EF Core 提供的自定义冲突解决逻辑。例如,他们可能希望合并来自不同来源的更改、提示用户做出决策或应用业务规则来解决冲突

细粒度控制

在某些情况下,开发人员可能需要更好地控制如何检测和解决并发冲突。他们可能希望在应用更改之前执行其他检查或验证。

与外部系统集成

与不遵循 EF Core 约定或不完全支持其功能的外部系统或旧数据库集成时,开发人员可能需要实现自定义并发处理机制,以确保跨系统的一致性和数据完整性。

旧代码兼容性

在涉及遗留代码或数据库的项目中,开发人员可能需要实现手动并发处理,以保持与现有系统的兼容性或解决旧技术的限制。

性能优化

在某些情况下,EF Core 的自动并发处理机制可能会产生性能开销,尤其是在处理大型数据集或高并发负载时。开发人员可以选择实现手动并发处理,以优化性能并减少开销

让我们深入了解一些真实世界的场景

自定义冲突解决逻辑

想象一下,在电子商务应用程序中,两个用户同时尝试购买限量库存产品的最后一件商品。虽然 EF Core 可以检测并发冲突,但开发人员可能需要实现自定义逻辑来决定如何处理这种情况,例如对某些用户进行优先级排序、通知他们冲突或允许他们竞价项目。

细粒度控制

银行应用程序中,在账户之间转移资金时,开发人员可能需要实现手动并发处理,以确保账户余额一致,并且不会因并发交易而损失资金。这可能涉及使用数据库锁或以比 EF Core 默认提供的更精细的粒度实现乐观并发检查。

与外部系统集成

考虑与传统患者管理系统集成的医疗保健应用程序。从两个系统同时更新患者记录时,开发人员可能需要实现自定义并发处理机制来同步数据并解决两个系统之间的冲突,从而确保患者信息在两个平台上保持准确和一致。

旧代码兼容性

从旧数据访问层迁移到 EF Core 的项目中,开发人员可能需要保持与旧代码中实现的现有并发处理机制或解决方法的兼容性。这可能涉及在 EF Core 中复制相同的并发逻辑,或逐步重构代码库以利用 EF Core 的内置并发功能。

性能优化

高吞吐量消息传递应用程序中,消息由多个用户或后台任务同时处理,开发人员可以选择实现手动并发处理,以优化性能并减少数据库争用。这可能涉及批处理更新、优化查询或使用存储过程来比 EF Core 的默认机制更有效地处理并发操作。

让我们模拟并发情况

数据库上下文

public class YourDbContext : DbContext  
{  
    public DbSet<Item> Items { get; set; }  
  
    protected override void OnModelCreating(ModelBuilder modelBuilder)  
    {  
        modelBuilder.Entity<Item>()  
            .Property(p => p.Timestamp)  
            .IsRowVersion();  
    }  
}

实体

public class Item  
{  
    public int Id { get; set; }  
    public string Name { get; set; }  
    public int Quantity { get; set; }  
    public byte[] Timestamp { get; set; } // Concurrency token  
}

存储 库

public class ItemRepository : IItemRepository
{
    public async Task UpdateItem()
    {
        // Add new item 
        var newItem = new Item { Name = "Example Item", Quantity = 10 };
        _context.Items.Add(newItem);
        await context.SaveChangesAsync();

        // Simulate concurrent updates for above item
        var task1 = Task.Run(async () => await UpdateItemQuantityAsync(1, 5)); // Reduce quantity by 5
        var task2 = Task.Run(async () => await UpdateItemQuantityAsync(1, 3)); // Reduce quantity by 3

        await Task.WhenAll(task1, task2);

        _log.info("Concurrent updates completed.");

        var item = await _context.Items.FindAsync(1);
        _log.info($"Item Name: {item.Name}, Quantity: {item.Quantity}");

    }

    public static async Task UpdateItemQuantityAsync(int itemId, int quantityChange)
    {
         var item = await _context.Items.FindAsync(itemId);
         if (item != null)
         {
             // Simulate some processing time
             await Task.Delay(100);

             item.Quantity -= quantityChange;
             try
             {
                 await context.SaveChangesAsync();
             }
             catch (DbUpdateConcurrencyException ex)
             {
                 // Custom conflict resolution logic
             }
        }
    }
}

使用自定义冲突解决逻辑处理并发

自定义冲突解决逻辑

public async Task<bool> PurchaseItemAsync(int itemId, string userId)
{
    using (var context = new YourDbContext())
    {
        var item = await context.Items.FindAsync(itemId);

        if (item.Quantity > 0)
        {
            item.Quantity--;
            await context.SaveChangesAsync();
            return true; // Purchase successful
        }
        else
        {
            // Custom conflict resolution logic: Allow user to bid for the item
            var highestBid = await context.Bids
                .Where(b => b.ItemId == itemId)
                .OrderByDescending(b => b.Amount)
                .FirstOrDefaultAsync();

            if (highestBid != null && highestBid.Amount > 0)
            {
                // Inform user about the highest bid
                return false;
            }
            else
            {
                // Notify user about insufficient stock
                return false; // Purchase failed due to insufficient stock
            }
        }
    }
}

细粒度控制

public async Task TransferFundsAsync(int sourceAccountId, int destinationAccountId, decimal amount)
{
    using (var context = new YourDbContext())
    {
        using (var transaction = await context.Database.BeginTransactionAsync())
        {
            var sourceAccount = await context.Accounts.FindAsync(sourceAccountId);
            var destinationAccount = await context.Accounts.FindAsync(destinationAccountId);

            if (sourceAccount.Balance >= amount)
            {
                sourceAccount.Balance -= amount;
                destinationAccount.Balance += amount;

                await context.SaveChangesAsync();
                await transaction.CommitAsync();
            }
            else
            {
                // Custom conflict resolution logic: Notify user about insufficient funds
                throw new InvalidOperationException("Transfer failed: Insufficient funds.");
            }
        }
    }
}

与外部系统集成

public async Task UpdatePatientAsync(Patient updatedPatient)  
{  
    using (var context = new YourDbContext())  
    {  
        var existingPatient = await context.Patients.FirstOrDefaultAsync(p => p.Id == updatedPatient.Id);  
  
        if (existingPatient != null)  
        {  
            // Merge changes from updatedPatient into existingPatient  
            existingPatient.Name = updatedPatient.Name;  
            existingPatient.Age = updatedPatient.Age;  
            // Update other properties as needed  
  
            await context.SaveChangesAsync();  
        }  
        else  
        {  
            // Custom conflict resolution logic: Handle the conflict with the legacy system  
            throw new InvalidOperationException("Update failed: Patient not found in the legacy system.");  
        }  
    }  
}

性能优化

public async Task ProcessMessageAsync(Message message)  
{  
    using (var context = new YourDbContext())  
    {  
        // Perform message processing logic  
  
        // Save changes periodically or in batches  
        if (ShouldSaveChanges()) // Implement your logic here  
        {  
            await context.SaveChangesAsync();  
        }  
    }  
}
相关留言评论
昵称:
邮箱:
阅读排行