EF Core 与 MySQL:事务和并发处理详解

本文将详细讲解EF Core与MySQL的事务和并发处理,分为三个部分:使用事务、处理并发冲突(乐观并发)以及悲观并发(MySQL中使用锁)。

  • 使用事务
    在EF Core中,可以使用事务来确保一系列操作要么全部成功,要么全部失败。EF Core支持多种事务管理方式,包括自动事务(SaveChanges自动包装事务)和显式事务。

  • 处理并发冲突(乐观并发)
    乐观并发假设多个事务很少冲突,因此不会立即锁定资源。而是在更新时检查数据是否被其他事务修改过。EF Core通过配置并发令牌(Concurrency Token)来实现乐观并发。

  • 悲观并发(MySQL中使用锁)
    悲观并发假设冲突经常发生,因此在读取数据时就会锁定资源,防止其他事务修改。在MySQL中,可以通过使用事务和SELECT … FOR UPDATE语句来实现悲观并发。

1. 使用事务

基本事务用法

// 使用DbContext的Database.BeginTransaction()
using (var transaction = context.Database.BeginTransaction())
{
    try
    {
        // 执行多个数据库操作
        var product = new Product { Name = "New Product", Price = 99.99m, Stock = 10 };
        context.Products.Add(product);
        
        var order = new Order { CustomerId = 1, OrderDate = DateTime.Now };
        context.Orders.Add(order);
        
        // 保存更改
        context.SaveChanges();
        
        // 提交事务
        transaction.Commit();
    }
    catch (Exception)
    {
        // 回滚事务
        transaction.Rollback();
        throw;
    }
}

使用TransactionScope(分布式事务)

// 需要安装System.Transactions包
using (var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
    try
    {
        // 多个数据库操作
        using (var context1 = new ApplicationDbContext())
        using (var context2 = new ApplicationDbContext())
        {
            context1.Products.Add(new Product { Name = "Product 1", Price = 10.99m });
            context2.Orders.Add(new Order { CustomerId = 1, OrderDate = DateTime.Now });
            
            await context1.SaveChangesAsync();
            await context2.SaveChangesAsync();
        }
        
        // 完成事务
        scope.Complete();
    }
    catch (Exception)
    {
        // 事务会自动回滚
        throw;
    }
}

设置事务隔离级别

using (var transaction = context.Database.BeginTransaction(IsolationLevel.ReadCommitted))
{
    try
    {
        // 执行操作
        context.Products.Add(new Product { Name = "New Product", Price = 99.99m });
        context.SaveChanges();
        
        transaction.Commit();
    }
    catch (Exception)
    {
        transaction.Rollback();
        throw;
    }
}

异步事务处理

public async Task<bool> ProcessOrderAsync(int productId, int quantity)
{
    using (var transaction = await context.Database.BeginTransactionAsync())
    {
        try
        {
            // 检查库存
            var product = await context.Products
                .FirstOrDefaultAsync(p => p.Id == productId);
                
            if (product == null || product.Stock < quantity)
                return false;
            
            // 减少库存
            product.Stock -= quantity;
            context.Products.Update(product);
            
            // 创建订单
            var order = new Order 
            { 
                ProductId = productId, 
                Quantity = quantity, 
                OrderDate = DateTime.Now 
            };
            context.Orders.Add(order);
            
            await context.SaveChangesAsync();
            await transaction.CommitAsync();
            
            return true;
        }
        catch (Exception)
        {
            await transaction.RollbackAsync();
            throw;
        }
    }
}

2. 处理并发冲突(乐观并发)

配置并发令牌

// 在实体中定义并发令牌属性
public class Product
{
    public int Id { get; set; }
    public string Name { get; set; }
    public decimal Price { get; set; }
    public int Stock { get; set; }
    
    // 使用时间戳作为并发令牌
    [Timestamp]
    public byte[] Version { get; set; }
    
    // 或者使用自定义的并发令牌
    // public uint RowVersion { get; set; }
}

// 在DbContext中配置并发令牌
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
    modelBuilder.Entity<Product>()
        .Property(p => p.Version)
        .IsRowVersion()
        .IsConcurrencyToken();
        
    // 或者使用非时间戳字段作为并发令牌
    modelBuilder.Entity<Product>()
        .Property(p => p.Name)
        .IsConcurrencyToken();
}

处理并发异常

public async Task<bool> UpdateProductAsync(Product updatedProduct)
{
    try
    {
        context.Products.Update(updatedProduct);
        await context.SaveChangesAsync();
        return true;
    }
    catch (DbUpdateConcurrencyException ex)
    {
        // 处理并发冲突
        foreach (var entry in ex.Entries)
        {
            if (entry.Entity is Product)
            {
                // 获取数据库中的当前值
                var databaseValues = await entry.GetDatabaseValuesAsync();
                
                if (databaseValues == null)
                {
                    // 实体已被删除
                    return false;
                }
                
                // 转换为实体
                var databaseProduct = (Product)databaseValues.ToObject();
                
                // 决定如何解决冲突
                // 选项1: 使用客户端值
                // entry.OriginalValues.SetValues(databaseValues);
                // await context.SaveChangesAsync();
                
                // 选项2: 使用数据库值
                // entry.CurrentValues.SetValues(databaseValues);
                
                // 选项3: 合并值
                var currentValues = entry.CurrentValues;
                var resolvedValues = currentValues.Clone();
                
                // 合并逻辑示例
                resolvedValues["Name"] = currentValues["Name"]; // 保留新名称
                resolvedValues["Price"] = databaseValues["Price"]; // 使用数据库价格
                resolvedValues["Stock"] = Math.Max(
                    (int)currentValues["Stock"], 
                    (int)databaseValues["Stock"]); // 使用较大库存
                
                // 设置解析后的值
                entry.OriginalValues.SetValues(databaseValues);
                entry.CurrentValues.SetValues(resolvedValues);
                
                // 重试保存
                await context.SaveChangesAsync();
                return true;
            }
        }
        
        return false;
    }
}

自定义并发冲突解决策略

public class ConcurrencyHandler
{
    public static async Task<bool> ResolveConcurrencyAsync(
        DbUpdateConcurrencyException ex, 
        ApplicationDbContext context)
    {
        var saved = false;
        
        foreach (var entry in ex.Entries)
        {
            // 获取数据库当前值
            var databaseValues = await entry.GetDatabaseValuesAsync();
            
            if (databaseValues == null)
            {
                // 实体已被删除
                entry.State = EntityState.Detached;
                continue;
            }
            
            // 获取原始值和当前值
            var originalValues = entry.OriginalValues;
            var currentValues = entry.CurrentValues;
            
            // 根据实体类型应用不同的解决策略
            if (entry.Entity is Product product)
            {
                await ResolveProductConcurrency(entry, databaseValues);
            }
            else if (entry.Entity is Order order)
            {
                await ResolveOrderConcurrency(entry, databaseValues);
            }
        }
        
        // 重试保存
        try
        {
            await context.SaveChangesAsync();
            saved = true;
        }
        catch (DbUpdateConcurrencyException)
        {
            // 如果再次失败,可能需要递归处理或放弃
        }
        
        return saved;
    }
    
    private static async Task ResolveProductConcurrency(
        EntityEntry entry, 
        PropertyValues databaseValues)
    {
        var databaseProduct = (Product)databaseValues.ToObject();
        var currentProduct = (Product)entry.Entity;
        
        // 解决策略:保留较高的库存值,其他字段使用最新值
        var resolvedStock = Math.Max(currentProduct.Stock, databaseProduct.Stock);
        
        // 更新实体
        entry.OriginalValues.SetValues(databaseValues);
        currentProduct.Stock = resolvedStock;
    }
}

3. 悲观并发(MySQL中使用锁)

使用SELECT … FOR UPDATE

// 使用原始SQL进行行级锁定
public async Task<Product> GetProductWithLockAsync(int productId)
{
    // 开始事务
    using (var transaction = await context.Database.BeginTransactionAsync(IsolationLevel.Serializable))
    {
        try
        {
            // 使用SELECT ... FOR UPDATE锁定行
            var product = await context.Products
                .FromSqlRaw("SELECT * FROM Products WHERE Id = {0} FOR UPDATE", productId)
                .AsNoTracking()
                .FirstOrDefaultAsync();
                
            // 执行需要原子性的操作
            // ...
            
            await transaction.CommitAsync();
            return product;
        }
        catch (Exception)
        {
            await transaction.RollbackAsync();
            throw;
        }
    }
}

使用EF Core的锁机制

// 在EF Core中模拟行级锁
public async Task<bool> ReserveProductAsync(int productId, int quantity)
{
    using (var transaction = await context.Database.BeginTransactionAsync(IsolationLevel.Serializable))
    {
        try
        {
            // 获取产品并锁定行
            var product = await context.Products
                .FirstOrDefaultAsync(p => p.Id == productId);
                
            if (product == null || product.Stock < quantity)
            {
                await transaction.RollbackAsync();
                return false;
            }
            
            // 更新库存
            product.Stock -= quantity;
            await context.SaveChangesAsync();
            
            await transaction.CommitAsync();
            return true;
        }
        catch (Exception)
        {
            await transaction.RollbackAsync();
            throw;
        }
    }
}

处理锁超时

// 在MySQL连接字符串中设置锁等待超时
var connectionString = "server=localhost;database=efcoredb;user=root;password=yourpassword;" +
    "Connection Timeout=30;Default Command Timeout=30;";

// 或者在DbContext中配置
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
    optionsBuilder.UseMySql(connectionString, ServerVersion.AutoDetect(connectionString),
        options => options.EnableRetryOnFailure().CommandTimeout(30));
}

// 处理锁超时异常
public async Task<bool> TryUpdateWithLockAsync(int productId, Action<Product> updateAction)
{
    const int maxRetries = 3;
    
    for (int attempt = 0; attempt < maxRetries; attempt++)
    {
        using (var transaction = await context.Database.BeginTransactionAsync(IsolationLevel.Serializable))
        {
            try
            {
                // 获取并锁定产品
                var product = await context.Products
                    .FirstOrDefaultAsync(p => p.Id == productId);
                    
                if (product == null)
                    return false;
                    
                // 执行更新操作
                updateAction(product);
                
                await context.SaveChangesAsync();
                await transaction.CommitAsync();
                
                return true;
            }
            catch (MySqlException ex) when (ex.Number == 1205) // Lock wait timeout
            {
                // 锁等待超时,重试
                await transaction.RollbackAsync();
                
                if (attempt == maxRetries - 1)
                    throw;
                    
                // 指数退避策略
                await Task.Delay(TimeSpan.FromMilliseconds(100 * Math.Pow(2, attempt)));
            }
            catch (Exception)
            {
                await transaction.RollbackAsync();
                throw;
            }
        }
    }
    
    return false;
}

表级锁定

// 使用表级锁定(谨慎使用,影响性能)
public async Task<bool> PerformMaintenanceAsync()
{
    using (var transaction = await context.Database.BeginTransactionAsync(IsolationLevel.Serializable))
    {
        try
        {
            // 锁定整个表
            await context.Database.ExecuteSqlRawAsync("LOCK TABLES Products WRITE");
            
            // 执行维护操作
            await context.Database.ExecuteSqlRawAsync("UPDATE Products SET Price = Price * 1.1");
            
            // 解锁表
            await context.Database.ExecuteSqlRawAsync("UNLOCK TABLES");
            
            await transaction.CommitAsync();
            return true;
        }
        catch (Exception)
        {
            await context.Database.ExecuteSqlRawAsync("UNLOCK TABLES");
            await transaction.RollbackAsync();
            throw;
        }
    }
}

4. 高级并发控制模式

使用版本号实现乐观并发

public class VersionedEntity
{
    public int Id { get; set; }
    public uint Version { get; set; } // 使用无符号整数作为版本号
}

// 配置版本号并发令牌
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
    modelBuilder.Entity<Product>()
        .Property(p => p.Version)
        .IsConcurrencyToken()
        .ValueGeneratedOnAddOrUpdate();
}

// 在更新时自动增加版本号
public override int SaveChanges()
{
    var entries = ChangeTracker.Entries()
        .Where(e => e.State == EntityState.Modified || e.State == EntityState.Added);
        
    foreach (var entry in entries)
    {
        if (entry.Entity is VersionedEntity entity)
        {
            entity.Version++;
        }
    }
    
    return base.SaveChanges();
}

使用自定义锁机制

// 实现简单的应用级锁机制
public class ApplicationLockService
{
    private static readonly ConcurrentDictionary<string, SemaphoreSlim> Locks = 
        new ConcurrentDictionary<string, SemaphoreSlim>();
    
    public async Task<T> ExecuteWithLockAsync<T>(string lockKey, Func<Task<T>> operation)
    {
        var lockObject = Locks.GetOrAdd(lockKey, key => new SemaphoreSlim(1, 1));
        
        await lockObject.WaitAsync();
        
        try
        {
            return await operation();
        }
        finally
        {
            lockObject.Release();
        }
    }
}

// 使用应用级锁
public async Task<bool> UpdateProductSafelyAsync(int productId, Action<Product> updateAction)
{
    var lockService = new ApplicationLockService();
    var lockKey = $"product_{productId}";
    
    return await lockService.ExecuteWithLockAsync(lockKey, async () =>
    {
        using (var context = new ApplicationDbContext())
        {
            var product = await context.Products.FindAsync(productId);
            if (product == null) return false;
            
            updateAction(product);
            await context.SaveChangesAsync();
            return true;
        }
    });
}

5. 监控和诊断并发问题

记录并发冲突

public class ConcurrencyLogger
{
    private readonly ILogger<ConcurrencyLogger> _logger;
    
    public ConcurrencyLogger(ILogger<ConcurrencyLogger> logger)
    {
        _logger = logger;
    }
    
    public void LogConcurrencyConflict(DbUpdateConcurrencyException ex, string operation)
    {
        _logger.LogWarning("并发冲突发生在操作: {Operation}", operation);
        
        foreach (var entry in ex.Entries)
        {
            var databaseValues = entry.GetDatabaseValues();
            var currentValues = entry.CurrentValues;
            var originalValues = entry.OriginalValues;
            
            _logger.LogWarning("实体: {EntityType}", entry.Metadata.Name);
            _logger.LogWarning("数据库值: {DatabaseValues}", databaseValues?.Properties);
            _logger.LogWarning("当前值: {CurrentValues}", currentValues?.Properties);
            _logger.LogWarning("原始值: {OriginalValues}", originalValues?.Properties);
        }
    }
}

性能计数器监控

// 监控并发冲突率
public class ConcurrencyMonitor
{
    private long _totalOperations;
    private long _concurrencyExceptions;
    
    public void RecordOperation(bool hadConcurrencyException = false)
    {
        Interlocked.Increment(ref _totalOperations);
        
        if (hadConcurrencyException)
        {
            Interlocked.Increment(ref _concurrencyExceptions);
        }
    }
    
    public double GetConcurrencyExceptionRate()
    {
        var total = Interlocked.Read(ref _totalOperations);
        var exceptions = Interlocked.Read(ref _concurrencyExceptions);
        
        return total > 0 ? (double)exceptions / total : 0;
    }
}

总结

本文详细介绍了EF Core与MySQL中的事务和并发处理,包括:

  1. 事务管理:
    • 基本事务用法和TransactionScope

    • 设置事务隔离级别

    • 异步事务处理

  1. 乐观并发控制:

    • 配置并发令牌(时间戳或自定义字段)

    • 处理DbUpdateConcurrencyException

    • 实现自定义冲突解决策略

  2. 悲观并发控制:

    • 使用SELECT … FOR UPDATE进行行级锁定

    • 处理锁超时和重试机制

    • 表级锁定(谨慎使用)

  3. 高级并发模式:

    • 版本号实现乐观并发

    • 应用级锁机制

    • 监控和诊断并发问题

在实际应用中,应根据具体场景选择合适的并发控制策略:

  • 对于读多写少的场景,乐观并发通常更高效

  • 对于写密集或需要强一致性的场景,悲观并发可能更合适

  • 总是要考虑超时和重试机制,以提高系统的健壮性

  • 监控并发冲突率,及时发现和解决性能瓶颈

正确实施事务和并发控制是构建高并发、高可用应用程序的关键。建议在生产环境中进行充分的压力测试,确保并发控制策略能够满足实际需求。